[vlc-devel] [RFC v2 1/2] executor: introduce new executor API

Romain Vimont rom1v at videolabs.io
Wed Sep 2 10:34:28 CEST 2020


On Wed, Sep 02, 2020 at 08:13:15AM +0200, Steve Lhomme wrote:
> On 2020-09-01 18:13, Romain Vimont wrote:
> > Introduce a new API to execute "runnables".
> > ---
> >   include/vlc_executor.h | 149 +++++++++++++++++++++++
> >   src/Makefile.am        |   1 +
> >   src/libvlccore.sym     |   4 +
> >   src/misc/executor.c    | 263 +++++++++++++++++++++++++++++++++++++++++
> >   4 files changed, 417 insertions(+)
> >   create mode 100644 include/vlc_executor.h
> >   create mode 100644 src/misc/executor.c
> > 
> > diff --git a/include/vlc_executor.h b/include/vlc_executor.h
> > new file mode 100644
> > index 0000000000..63dd187387
> > --- /dev/null
> > +++ b/include/vlc_executor.h
> > @@ -0,0 +1,149 @@
> > +/*****************************************************************************
> > + * vlc_executor.h
> > + *****************************************************************************
> > + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> > + *
> > + * This program is free software; you can redistribute it and/or modify it
> > + * under the terms of the GNU Lesser General Public License as published by
> > + * the Free Software Foundation; either version 2.1 of the License, or
> > + * (at your option) any later version.
> > + *
> > + * This program is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + * GNU Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public License
> > + * along with this program; if not, write to the Free Software Foundation,
> > + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> > + *****************************************************************************/
> > +
> > +#ifndef VLC_EXECUTOR_H
> > +#define VLC_EXECUTOR_H
> > +
> > +#include <vlc_common.h>
> > +#include <vlc_list.h>
> > +
> > +# ifdef __cplusplus
> > +extern "C" {
> > +# endif
> > +
> > +/** Executor type (opaque) */
> > +typedef struct vlc_executor vlc_executor_t;
> > +
> > +/**
> > + * A Runnable is intended to be run from another thread by an executor.
> > + */
> > +struct vlc_runnable {
> > +
> > +    /**
> > +     * This function is to be executed by a vlc_executor_t.
> > +     *
> > +     * It must implement the actions (arbitrarily long) to execute from an
> > +     * executor thread, synchronously. As soon as run() returns, the execution
> > +     * of this runnable is complete.
> > +     *
> > +     * After the runnable is submitted to an executor via
> > +     * vlc_executor_Submit(), the run() function is executed at most once (zero
> > +     * if the execution is canceled before it was started).
> > +     *
> > +     * It may not be NULL.
> 
> It MUST not be NULL. Otherwise there's no point in a runnable at all.

I thought that "it may not be NULL" meant "it is not possible that it is
NULL" (i.e. in French "il ne peut pas ĂȘtre NULL", but not "il peut ne
pas ĂȘtre NULL). It probably depends on the context.

Anyway, "it must not be NULL" is better.

> > +     *
> > +     * \param userdata the userdata provided to vlc_executor_Submit()
> > +     */
> > +    void (*run)(void *userdata);
> 
> Since the userdata is also part of the vlc_runnable, you may pass the
> vlc_runnable instead of the userdata. It allows more flexibilty in the
> future (for example if you want to do actions on the runnable, like run it
> again).

Good point. But now I realize that running a runnable again may cause
issues in regards to cancelation/deletion.

> > +
> > +    /**
> > +     * Userdata passed back to run().
> > +     */
> > +    void *userdata;
> > +
> > +    /* Private data used by the vlc_executor_t (do not touch) */
> > +    struct vlc_list node;
> > +};
> > +
> > +/**
> > + * Create a new executor.
> > + *
> > + * \param max_threads the maximum number of threads used to execute runnables
> > + * \return a pointer to a new executor, or NULL if an error occurred
> > + */
> > +VLC_API vlc_executor_t *
> > +vlc_executor_New(unsigned max_threads);
> > +
> > +/**
> > + * Delete an executor.
> > + *
> > + * Wait for all the threads to complete, and delete the executor instance.
> > + *
> > + * \param executor the executor
> > + */
> > +VLC_API void
> > +vlc_executor_Delete(vlc_executor_t *executor);
> > +
> > +/**
> > + * Submit a runnable for execution.
> > + *
> > + * The struct vlc_runnable is not copied, it must exist until the end of the
> > + * execution (the user is expected to embed it in its own task structure).
> > + *
> > + * Here is a simple example:
> > + *
> > + * \code{c}
> > + *  struct my_task {
> > + *      char *str;
> > + *      struct vlc_runnable runnable;
> > + *  };
> > + *
> > + *  static void Run(void *userdata)
> > + *  {
> > + *      struct my_task *task = userdata;
> 
> Here it would become a container_of(), which is still a cast but cleaner
> IMO.

Yes, or just:

    task = runnable->userdata;

> > + *
> > + *      printf("start of %s\n", task->str);
> > + *      vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
> > + *      printf("end of %s\n", task->str);
> > + *
> > + *      free(task->str);
> > + *      free(task);
> > + *  }
> > + *
> > + *  void foo(vlc_executor_t *executor, const char *str)
> > + *  {
> > + *      // no error handling for brevity
> > + *      struct my_task *task = malloc(sizeof(*task));
> > + *      task->str = strdup(str);
> > + *      task->runnable.run = Run;
> > + *      task->runnable.userdata = task;
> 
> And in fact you don't need any userdata at all. You just wrap your runnable
> in a structure with the data you want. (as an object that would support the
> runnable interface)

I prefer to let the user decide how it stores its runnable. "userdata"
is quite "standard".

> > + *      vlc_executor_Submit(executor, &task->runnable);
> > + *  }
> > + * \endcode
> > + *
> > + * \param executor the executor
> > + * \param runnable the task to run
> > + */
> > +VLC_API void
> > +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
> > +
> > +/**
> > + * Cancel a runnable previously submitted.
> > + *
> > + * If this runnable is still queued (i.e. it has not be run yet), then dequeue
> > + * it so that it will never be run, and return true.
> > + *
> > + * Otherwise, this runnable has already been taken by an executor thread (it is
> > + * still running or is complete). In that case, do nothing, and return false.
> > + *
> > + * The result is undefined if the runnable has not been submitted.
> > + *
> > + * \param executor the executor
> > + * \param runnable the task to cancel
> > + * \return true if the runnable has been canceled before execution
> > + */
> > +VLC_API bool
> > +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);
> 
> It might be worth writing somewhere that the executor will not free the
> vlc_runnable, it's the responsibility of the caller to do so (or the
> runnable itself, as in your example).

Yes, good point.

> > +
> > +# ifdef __cplusplus
> > +}
> > +# endif
> > +
> > + #endif
> > diff --git a/src/Makefile.am b/src/Makefile.am
> > index 6dc46ab3d6..5336fd7ac0 100644
> > --- a/src/Makefile.am
> > +++ b/src/Makefile.am
> > @@ -352,6 +352,7 @@ libvlccore_la_SOURCES = \
> >   	misc/actions.c \
> >   	misc/background_worker.c \
> >   	misc/background_worker.h \
> > +	misc/executor.c \
> >   	misc/md5.c \
> >   	misc/probe.c \
> >   	misc/rand.c \
> > diff --git a/src/libvlccore.sym b/src/libvlccore.sym
> > index 2b750873fc..6d72ee03e5 100644
> > --- a/src/libvlccore.sym
> > +++ b/src/libvlccore.sym
> > @@ -967,3 +967,7 @@ vlc_video_context_GetType
> >   vlc_video_context_GetPrivate
> >   vlc_video_context_Hold
> >   vlc_video_context_HoldDevice
> > +vlc_executor_New
> > +vlc_executor_Delete
> > +vlc_executor_Submit
> > +vlc_executor_Cancel
> > diff --git a/src/misc/executor.c b/src/misc/executor.c
> > new file mode 100644
> > index 0000000000..274ae3eb84
> > --- /dev/null
> > +++ b/src/misc/executor.c
> > @@ -0,0 +1,263 @@
> > +/*****************************************************************************
> > + * misc/executor.c
> > + *****************************************************************************
> > + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> > + *
> > + * This program is free software; you can redistribute it and/or modify it
> > + * under the terms of the GNU Lesser General Public License as published by
> > + * the Free Software Foundation; either version 2.1 of the License, or
> > + * (at your option) any later version.
> > + *
> > + * This program is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + * GNU Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public License
> > + * along with this program; if not, write to the Free Software Foundation,
> > + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> > + *****************************************************************************/
> > +
> > +#ifdef HAVE_CONFIG_H
> > +# include "config.h"
> > +#endif
> > +
> > +#include <vlc_executor.h>
> > +
> > +#include <vlc_atomic.h>
> > +#include <vlc_list.h>
> > +#include <vlc_threads.h>
> > +#include "libvlc.h"
> > +
> > +/**
> > + * An executor can spawn several threads.
> > + *
> > + * This structure contains the data specific to one thread.
> > + */
> > +struct vlc_executor_thread {
> > +    /** Node of vlc_executor.threads list */
> > +    struct vlc_list node;
> > +
> > +    /** The executor owning the thread */
> > +    vlc_executor_t *owner;
> > +
> > +    /** The system thread */
> > +    vlc_thread_t thread;
> > +
> > +    /** The current task executed by the thread, NULL if none */
> > +    struct vlc_runnable *current_task;
> > +};
> > +
> > +/**
> > + * The executor (also vlc_executor_t, exposed as opaque type in the public
> > + * header).
> > + */
> > +struct vlc_executor {
> > +    vlc_mutex_t lock;
> > +
> > +    /** Maximum number of threads to run the tasks */
> > +    unsigned max_threads;
> > +
> > +    /** List of active vlc_executor_thread */
> > +    struct vlc_list threads;
> > +
> > +    /** Thread count (in a separate field to quickly compare to max_threads) */
> > +    unsigned nthreads;
> > +
> > +    /* Number of tasks requested but not finished. */
> > +    unsigned unfinished;
> > +
> > +    /** Queue of vlc_runnable */
> > +    struct vlc_list queue;
> > +
> > +    /** Wait for the queue to be non-empty */
> > +    vlc_cond_t queue_wait;
> > +
> > +    /** True if executor deletion is requested */
> > +    bool closing;
> > +};
> > +
> > +static void
> > +QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_assert(&executor->lock);
> > +
> > +    vlc_list_append(&runnable->node, &executor->queue);
> > +    vlc_cond_signal(&executor->queue_wait);
> > +}
> > +
> > +static struct vlc_runnable *
> > +QueueTake(vlc_executor_t *executor)
> > +{
> > +    vlc_mutex_assert(&executor->lock);
> > +
> > +    while (!executor->closing && vlc_list_is_empty(&executor->queue))
> > +        vlc_cond_wait(&executor->queue_wait, &executor->lock);
> > +
> > +    if (executor->closing)
> > +        return NULL;
> > +
> > +    struct vlc_runnable *runnable =
> > +        vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
> > +                                     node);
> > +    assert(runnable);
> > +    vlc_list_remove(&runnable->node);
> > +
> > +    /* Set links to NULL to know that it has been taken by a thread in
> > +     * vlc_executor_Cancel() */
> > +    runnable->node.prev = runnable->node.next = NULL;
> > +
> > +    return runnable;
> > +}
> > +
> > +static void *
> > +ThreadRun(void *userdata)
> > +{
> > +    struct vlc_executor_thread *thread = userdata;
> > +    vlc_executor_t *executor = thread->owner;
> > +
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    for (;;)
> > +    {
> > +        struct vlc_runnable *runnable = QueueTake(executor);
> > +        if (!runnable)
> > +            /* The executor is closing, terminate this thread */
> > +            break;
> > +
> > +        thread->current_task = runnable;
> > +        vlc_mutex_unlock(&executor->lock);
> > +
> > +        /* Execute the user-provided runnable, without the executor lock */
> > +        runnable->run(runnable->userdata);
> > +
> > +        vlc_mutex_lock(&executor->lock);
> > +        thread->current_task = NULL;
> > +
> > +        assert(executor->unfinished > 0);
> > +        --executor->unfinished;
> > +    }
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    return NULL;
> > +}
> > +
> > +static int
> > +SpawnThread(vlc_executor_t *executor)
> > +{
> > +    assert(executor->nthreads < executor->max_threads);
> > +
> > +    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
> > +    if (!thread)
> > +        return VLC_ENOMEM;
> > +
> > +    thread->owner = executor;
> > +    thread->current_task = NULL;
> > +
> > +    if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
> > +    {
> > +        free(thread);
> > +        return VLC_EGENERIC;
> > +    }
> > +
> > +    executor->nthreads++;
> > +    vlc_list_append(&thread->node, &executor->threads);
> > +
> > +    return VLC_SUCCESS;
> > +}
> > +
> > +vlc_executor_t *
> > +vlc_executor_New(unsigned max_threads)
> > +{
> > +    assert(max_threads);
> > +    vlc_executor_t *executor = malloc(sizeof(*executor));
> > +    if (!executor)
> > +        return NULL;
> > +
> > +    vlc_mutex_init(&executor->lock);
> > +
> > +    executor->max_threads = max_threads;
> > +    executor->nthreads = 0;
> > +    executor->unfinished = 0;
> > +
> > +    vlc_list_init(&executor->threads);
> > +    vlc_list_init(&executor->queue);
> > +
> > +    vlc_cond_init(&executor->queue_wait);
> > +
> > +    executor->closing = false;
> > +
> > +    /* Create one thread on init so that vlc_executor_Submit() may never fail */
> > +    int ret = SpawnThread(executor);
> > +    if (ret != VLC_SUCCESS)
> > +    {
> > +        free(executor);
> > +        return NULL;
> > +    }
> > +
> > +    return executor;
> > +}
> > +
> > +void
> > +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    assert(!executor->closing);
> 
> It might be legitimate to try to submit a task even if the executor is
> closing. For example if a runnable is trying to submit another runnable. It
> should have to check with some higher power if it's allowed to do so or not
> (atomically). So at least you should just return here.

In fact, just returning here will still cause problems: once submitted,
a runnable is either canceled (with vlc_executor_Cancel() returning
VLC_SUCCESS) or run (the run() callback is called).

This allows the caller to know if the task is "finished" (so that it can
safely delete its resources).

If a _Submit() is ignored, then the runnable will not be canceled and
will not be run, so resources will leak.

I will think about it. I would like not to return a bool from _Submit()
just for that. I don't want to add a separate "deactivate" function for
closing before _Delete(). Maybe the threads should continue to dequeue
the pending tasks until the queue is empty in case a runnable queues
another one after "closing"?

> > +
> > +    QueuePush(executor, runnable);
> > +
> > +    if (++executor->unfinished > executor->nthreads
> > +            && executor->nthreads < executor->max_threads)
> > +        /* If it fails, this is not an error, there is at least one thread */
> > +        SpawnThread(executor);
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +}
> > +
> > +bool
> > +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    /* Either both prev and next are set, either both are NULL */
> > +    assert(!!runnable->node.prev == !!runnable->node.next);
> > +
> > +    bool in_queue = runnable->node.prev;
> > +    if (in_queue)
> > +        vlc_list_remove(&runnable->node);
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    return in_queue;
> > +}
> > +
> > +void
> > +vlc_executor_Delete(vlc_executor_t *executor)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    executor->closing = true;
> > +
> > +    /* All the tasks must be canceled on delete */
> > +    assert(vlc_list_is_empty(&executor->queue));
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    /* "closing" is now true, this will wake up threads */
> > +    vlc_cond_broadcast(&executor->queue_wait);
> > +
> > +    /* The threads list may not be written at this point, so it is safe to read
> > +     * it without mutex locked (the mutex must be released to join the
> > +     * threads). */
> > +
> > +    struct vlc_executor_thread *thread;
> > +    vlc_list_foreach(thread, &executor->threads, node)
> > +    {
> > +        vlc_join(thread->thread, NULL);
> > +        free(thread);
> > +    }
> > +
> > +    free(executor);
> > +}
> > -- 
> > 2.28.0
> > 
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > https://mailman.videolan.org/listinfo/vlc-devel
> > 
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list