[vlc-devel] [RFC v2 2/2] preparser: use the new executor API

Romain Vimont rom1v at videolabs.io
Wed Sep 2 11:06:55 CEST 2020


On Wed, Sep 02, 2020 at 08:28:14AM +0200, Steve Lhomme wrote:
> On 2020-09-01 18:13, Romain Vimont wrote:
> > Replace the background_worker by an executor.
> > ---
> >   src/preparser/preparser.c | 399 ++++++++++++++++++++++++--------------
> >   1 file changed, 251 insertions(+), 148 deletions(-)
> > 
> > diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
> > index 723feb96dc..c78e956985 100644
> > --- a/src/preparser/preparser.c
> > +++ b/src/preparser/preparser.c
> > @@ -24,8 +24,9 @@
> >   #include <vlc_common.h>
> >   #include <vlc_atomic.h>
> > +#include <vlc_executor.h>
> > +#include <vlc_tick.h>
> 
> Why do we need this include ? We never needed it before.

We don't. It's from an older version of the executor :)


> > -#include "misc/background_worker.h"
> >   #include "input/input_interface.h"
> >   #include "input/input_internal.h"
> >   #include "preparser.h"
> > @@ -35,222 +36,307 @@ struct input_preparser_t
> >   {
> >       vlc_object_t* owner;
> >       input_fetcher_t* fetcher;
> > -    struct background_worker* worker;
> > +    vlc_executor_t *executor;
> > +    vlc_tick_t default_timeout;
> >       atomic_bool deactivated;
> > +
> > +    vlc_mutex_t lock;
> > +    struct vlc_list submitted_tasks; /**< list of struct input_preparser_task */
> >   };
> > -typedef struct input_preparser_req_t
> > +struct input_preparser_task
> >   {
> > +    input_preparser_t *preparser;
> >       input_item_t *item;
> >       input_item_meta_request_option_t options;
> >       const input_preparser_callbacks_t *cbs;
> >       void *userdata;
> > -    vlc_atomic_rc_t rc;
> > -} input_preparser_req_t;
> > +    void *id;
> > +    vlc_tick_t timeout;
> > -typedef struct input_preparser_task_t
> > -{
> > -    input_preparser_req_t *req;
> > -    input_preparser_t* preparser;
> > -    int preparse_status;
> >       input_item_parser_id_t *parser;
> > -    atomic_int state;
> > -    atomic_bool done;
> > -} input_preparser_task_t;
> > -
> > -static input_preparser_req_t *ReqCreate(input_item_t *item,
> > -                                        input_item_meta_request_option_t options,
> > -                                        const input_preparser_callbacks_t *cbs,
> > -                                        void *userdata)
> > +
> > +    vlc_mutex_t lock;
> > +    vlc_cond_t cond_ended;
> > +    bool preparse_ended;
> > +    int preparse_status;
> > +    bool fetch_ended;
> > +
> > +    atomic_bool interrupted;
> > +
> > +    struct vlc_runnable runnable; /**< to be passed to the executor */
> > +
> > +    struct vlc_list node; /**< node of input_preparser_t.submitted_tasks */
> > +};
> > +
> > +static void RunnableRun(void *);
> > +
> > +static struct input_preparser_task *
> > +TaskNew(input_preparser_t *preparser, input_item_t *item,
> > +        input_item_meta_request_option_t options,
> > +        const input_preparser_callbacks_t *cbs, void *userdata,
> > +        void *id, vlc_tick_t timeout)
> >   {
> > -    input_preparser_req_t *req = malloc(sizeof(*req));
> > -    if (unlikely(!req))
> > +    assert(timeout >= 0);
> 
> Did you mean VLC_TICK_INVALID ?

I keep the meaning exposed to the user:
 - -1: use the default timeout (so not possible here)
 - 0: infinite timeout
 - >0: use the given timeout

> > +
> > +    struct input_preparser_task *task = malloc(sizeof(*task));
> > +    if (!task)
> >           return NULL;
> > -    req->item = item;
> > -    req->options = options;
> > -    req->cbs = cbs;
> > -    req->userdata = userdata;
> > -    vlc_atomic_rc_init(&req->rc);
> > +    task->preparser = preparser;
> > +    task->item = item;
> > +    task->options = options;
> > +    task->cbs = cbs;
> > +    task->userdata = userdata;
> > +    task->id = id;
> > +    task->timeout = timeout;
> >       input_item_Hold(item);
> > -    return req;
> > +    task->parser = NULL;
> > +    vlc_mutex_init(&task->lock);
> > +    vlc_cond_init(&task->cond_ended);
> > +    task->preparse_ended = false;
> > +    task->preparse_status = ITEM_PREPARSE_SKIPPED;
> > +    task->fetch_ended = false;
> > +
> > +    atomic_init(&task->interrupted, false);
> > +
> > +    task->runnable.run = RunnableRun;
> > +    task->runnable.userdata = task;
> > +
> > +    return task;
> >   }
> > -static void ReqHold(input_preparser_req_t *req)
> > +static void
> > +TaskDelete(struct input_preparser_task *task)
> >   {
> > -    vlc_atomic_rc_inc(&req->rc);
> > +    input_item_Release(task->item);
> > +    free(task);
> >   }
> > -static void ReqRelease(input_preparser_req_t *req)
> > +static void
> > +PreparserAddTask(input_preparser_t *preparser,
> > +                 struct input_preparser_task *task)
> >   {
> > -    if (vlc_atomic_rc_dec(&req->rc))
> > -    {
> > -        input_item_Release(req->item);
> > -        free(req);
> > -    }
> > +    vlc_mutex_lock(&preparser->lock);
> > +    vlc_list_append(&task->node, &preparser->submitted_tasks);
> > +    vlc_mutex_unlock(&preparser->lock);
> >   }
> > -static void OnParserEnded(input_item_t *item, int status, void *task_)
> > +static void
> > +PreparserRemoveTask(input_preparser_t *preparser,
> > +                    struct input_preparser_task *task)
> >   {
> > -    VLC_UNUSED(item);
> > -    input_preparser_task_t* task = task_;
> > -
> > -    atomic_store( &task->state, status );
> > -    atomic_store( &task->done, true );
> > -    background_worker_RequestProbe( task->preparser->worker );
> > +    vlc_mutex_lock(&preparser->lock);
> > +    vlc_list_remove(&task->node);
> > +    vlc_mutex_unlock(&preparser->lock);
> >   }
> > -static void OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
> > -                                 void *task_)
> > +static void
> > +FinishTask(struct input_preparser_task *task)
> >   {
> > -    VLC_UNUSED(item);
> > -    input_preparser_task_t* task = task_;
> > -    input_preparser_req_t *req = task->req;
> > +    if (task->cbs && task->cbs->on_preparse_ended)
> > +        task->cbs->on_preparse_ended(task->item, task->preparse_status,
> > +                                     task->userdata);
> > -    if (req->cbs && req->cbs->on_subtree_added)
> > -        req->cbs->on_subtree_added(req->item, subtree, req->userdata);
> > +    input_preparser_t *preparser = task->preparser;
> > +    PreparserRemoveTask(preparser, task);
> > +    TaskDelete(task);
> >   }
> > -static int PreparserOpenInput( void* preparser_, void* req_, void** out )
> > +static void
> > +OnParserEnded(input_item_t *item, int status, void *task_)
> >   {
> > -    input_preparser_t* preparser = preparser_;
> > -    input_preparser_req_t *req = req_;
> > -    input_preparser_task_t* task = malloc( sizeof *task );
> > -
> > -    if( unlikely( !task ) )
> > -        goto error;
> > -
> > -    static const input_item_parser_cbs_t cbs = {
> > -        .on_ended = OnParserEnded,
> > -        .on_subtree_added = OnParserSubtreeAdded,
> > -    };
> > -
> > -    atomic_init( &task->state, VLC_ETIMEOUT );
> > -    atomic_init( &task->done, false );
> > -
> > -    task->preparser = preparser_;
> > -    task->req = req;
> > -    task->preparse_status = -1;
> > -    task->parser = input_item_Parse( req->item, preparser->owner, &cbs,
> > -                                     task );
> > -    if( !task->parser )
> > -        goto error;
> > +    VLC_UNUSED(item);
> > +    struct input_preparser_task *task = task_;
> > -    *out = task;
> > +    if (atomic_load(&task->interrupted))
> > +        /*
> > +         * On interruption, the call to input_item_parser_id_Release() may
> > +         * trigger this "parser ended" callback. Ignore it.
> > +         */
> > +        return;
> > -    return VLC_SUCCESS;
> > +    vlc_mutex_lock(&task->lock);
> > +    assert(!task->preparse_ended);
> > +    task->preparse_status = status == VLC_SUCCESS ? ITEM_PREPARSE_DONE
> > +                                                  : ITEM_PREPARSE_FAILED;
> > +    task->preparse_ended = true;
> > +    vlc_mutex_unlock(&task->lock);
> > -error:
> > -    free( task );
> > -    if (req->cbs && req->cbs->on_preparse_ended)
> > -        req->cbs->on_preparse_ended(req->item, ITEM_PREPARSE_FAILED, req->userdata);
> > -    return VLC_EGENERIC;
> > +    vlc_cond_signal(&task->cond_ended);
> >   }
> > -static int PreparserProbeInput( void* preparser_, void* task_ )
> > +static void
> > +OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
> > +                     void *task_)
> >   {
> > -    input_preparser_task_t* task = task_;
> > -    return atomic_load( &task->done );
> > -    VLC_UNUSED( preparser_ );
> > +    VLC_UNUSED(item);
> > +    struct input_preparser_task *task = task_;
> > +
> > +    if (task->cbs && task->cbs->on_subtree_added)
> > +        task->cbs->on_subtree_added(task->item, subtree, task->userdata);
> >   }
> > -static void on_art_fetch_ended(input_item_t *item, bool fetched, void *userdata)
> > +static void
> > +OnArtFetchEnded(input_item_t *item, bool fetched, void *userdata)
> >   {
> >       VLC_UNUSED(item);
> >       VLC_UNUSED(fetched);
> > -    input_preparser_task_t *task = userdata;
> > -    input_preparser_req_t *req = task->req;
> > -    input_item_SetPreparsed(req->item, true);
> > +    struct input_preparser_task *task = userdata;
> > -    if (req->cbs && req->cbs->on_preparse_ended)
> > -        req->cbs->on_preparse_ended(req->item, task->preparse_status, req->userdata);
> > +    vlc_mutex_lock(&task->lock);
> > +    assert(!task->fetch_ended);
> > +    task->fetch_ended = true;
> > +    vlc_mutex_unlock(&task->lock);
> > -    ReqRelease(req);
> > -    free(task);
> > +    vlc_cond_signal(&task->cond_ended);
> >   }
> >   static const input_fetcher_callbacks_t input_fetcher_callbacks = {
> > -    .on_art_fetch_ended = on_art_fetch_ended,
> > +    .on_art_fetch_ended = OnArtFetchEnded,
> >   };
> > -static void PreparserCloseInput( void* preparser_, void* task_ )
> > +static void
> > +Parse(struct input_preparser_task *task, vlc_tick_t deadline, bool *timed_out)
> >   {
> > -    input_preparser_task_t* task = task_;
> > -    input_preparser_req_t *req = task->req;
> > +    *timed_out = false;
> > -    input_preparser_t* preparser = preparser_;
> > -    input_item_t* item = req->item;
> > +    static const input_item_parser_cbs_t cbs = {
> > +        .on_ended = OnParserEnded,
> > +        .on_subtree_added = OnParserSubtreeAdded,
> > +    };
> > -    int status;
> > -    switch( atomic_load( &task->state ) )
> > +    vlc_object_t *obj = task->preparser->owner;
> > +    task->parser = input_item_Parse(task->item, obj, &cbs, task);
> > +    if (!task->parser)
> >       {
> > -        case VLC_SUCCESS:
> > -            status = ITEM_PREPARSE_DONE;
> > -            break;
> > -        case VLC_ETIMEOUT:
> > -            status = ITEM_PREPARSE_TIMEOUT;
> > -            break;
> > -        default:
> > -            status = ITEM_PREPARSE_FAILED;
> > -            break;
> > +        task->preparse_status = ITEM_PREPARSE_FAILED;
> > +        return;
> >       }
> > -    input_item_parser_id_Release( task->parser );
> > -
> > -    if( preparser->fetcher && (req->options & META_REQUEST_OPTION_FETCH_ANY) )
> > +    /* Wait until the end of parsing */
> > +    vlc_mutex_lock(&task->lock);
> > +    if (deadline == VLC_TICK_INVALID)
> >       {
> > -        task->preparse_status = status;
> > -        ReqHold(task->req);
> > -        if (!input_fetcher_Push(preparser->fetcher, item,
> > -                                req->options & META_REQUEST_OPTION_FETCH_ANY,
> > -                                &input_fetcher_callbacks, task))
> > +        while (!task->preparse_ended)
> > +            vlc_cond_wait(&task->cond_ended, &task->lock);
> > +    }
> > +    else
> > +    {
> > +        bool timeout = false;
> > +        while (!task->preparse_ended && !timeout)
> > +            timeout =
> > +                vlc_cond_timedwait(&task->cond_ended, &task->lock, deadline);
> > +
> > +        if (timeout)
> >           {
> > -            return;
> > +            task->preparse_status = ITEM_PREPARSE_TIMEOUT;
> > +            *timed_out = true;
> >           }
> > -        ReqRelease(task->req);
> >       }
> > +    vlc_mutex_unlock(&task->lock);
> > -    free(task);
> > +    /* This call also interrupts the parsing if it is still running */
> > +    input_item_parser_id_Release(task->parser);
> > +}
> > +
> > +static void
> > +Fetch(struct input_preparser_task *task)
> > +{
> > +    input_fetcher_t *fetcher = task->preparser->fetcher;
> > +    if (!fetcher || !(task->options & META_REQUEST_OPTION_FETCH_ANY))
> > +        return;
> > +
> > +    int ret =
> > +        input_fetcher_Push(fetcher, task->item,
> > +                           task->options & META_REQUEST_OPTION_FETCH_ANY,
> > +                           &input_fetcher_callbacks, task);
> > +    if (ret != VLC_SUCCESS)
> > +        return;
> > -    input_item_SetPreparsed( item, true );
> > -    if (req->cbs && req->cbs->on_preparse_ended)
> > -        req->cbs->on_preparse_ended(req->item, status, req->userdata);
> > +    /* Wait until the end of fetching (fetching is not interruptible) */
> > +    vlc_mutex_lock(&task->lock);
> > +    while (!task->fetch_ended)
> > +        vlc_cond_wait(&task->cond_ended, &task->lock);
> > +    vlc_mutex_unlock(&task->lock);
> >   }
> > -static void ReqHoldVoid(void *item) { ReqHold(item); }
> > -static void ReqReleaseVoid(void *item) { ReqRelease(item); }
> > +static void
> > +RunnableRun(void *userdata)
> > +{
> > +    struct input_preparser_task *task = userdata;
> > +
> > +    vlc_tick_t deadline = task->timeout ? vlc_tick_now() + task->timeout
> > +                                        : VLC_TICK_INVALID;
> > +
> > +    if (atomic_load(&task->interrupted))
> > +        goto end;
> > +
> > +    bool timed_out;
> > +    Parse(task, deadline, &timed_out);
> > +
> > +    if (timed_out)
> > +        goto end;
> > +
> > +    if (atomic_load(&task->interrupted))
> > +        goto end;
> > +
> > +    Fetch(task);
> > +
> > +    if (atomic_load(&task->interrupted))
> > +        goto end;
> > +
> > +    input_item_SetPreparsed(task->item, true);
> > +
> > +end:
> > +    FinishTask(task);
> > +}
> > +
> > +static void
> > +Interrupt(struct input_preparser_task *task)
> > +{
> > +    assert(!atomic_load(&task->interrupted));
> > +    atomic_store(&task->interrupted, true);
> > +
> > +    /* Wake up the preparser cond_wait */
> > +    vlc_mutex_lock(&task->lock);
> > +    task->preparse_ended = true;
> > +    vlc_mutex_unlock(&task->lock);
> > +    vlc_cond_signal(&task->cond_ended);
> > +}
> >   input_preparser_t* input_preparser_New( vlc_object_t *parent )
> >   {
> >       input_preparser_t* preparser = malloc( sizeof *preparser );
> > +    if (!preparser)
> > +        return NULL;
> > -    struct background_worker_config conf = {
> > -        .default_timeout = VLC_TICK_FROM_MS(var_InheritInteger( parent, "preparse-timeout" )),
> > -        .max_threads = var_InheritInteger( parent, "preparse-threads" ),
> > -        .pf_start = PreparserOpenInput,
> > -        .pf_probe = PreparserProbeInput,
> > -        .pf_stop = PreparserCloseInput,
> > -        .pf_release = ReqReleaseVoid,
> > -        .pf_hold = ReqHoldVoid
> > -    };
> > -
> > -
> > -    if( likely( preparser ) )
> > -        preparser->worker = background_worker_New( preparser, &conf );
> > +    int max_threads = var_InheritInteger(parent, "preparse-threads");
> > +    if (max_threads < 1)
> > +        max_threads = 1;
> > -    if( unlikely( !preparser || !preparser->worker ) )
> > +    preparser->executor = vlc_executor_New(max_threads);
> > +    if (!preparser->executor)
> >       {
> > -        free( preparser );
> > +        free(preparser);
> >           return NULL;
> >       }
> > +    preparser->default_timeout =
> > +        VLC_TICK_FROM_MS(var_InheritInteger(parent, "preparse-timeout"));
> > +    if (preparser->default_timeout < 0)
> > +        preparser->default_timeout = 0;
> > +
> >       preparser->owner = parent;
> >       preparser->fetcher = input_fetcher_New( parent );
> >       atomic_init( &preparser->deactivated, false );
> > +    vlc_mutex_init(&preparser->lock);
> > +    vlc_list_init(&preparser->submitted_tasks);
> > +
> >       if( unlikely( !preparser->fetcher ) )
> >           msg_Warn( parent, "unable to create art fetcher" );
> > @@ -260,7 +346,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
> >   void input_preparser_Push( input_preparser_t *preparser,
> >       input_item_t *item, input_item_meta_request_option_t i_options,
> >       const input_preparser_callbacks_t *cbs, void *cbs_userdata,
> > -    int timeout, void *id )
> > +    int timeout_ms, void *id )
> >   {
> >       if( atomic_load( &preparser->deactivated ) )
> >           return;
> > @@ -287,14 +373,14 @@ void input_preparser_Push( input_preparser_t *preparser,
> >               return;
> >       }
> > -    struct input_preparser_req_t *req = ReqCreate(item, i_options,
> > -                                                  cbs, cbs_userdata);
> > +    vlc_tick_t timeout = timeout_ms == -1 ? preparser->default_timeout
> > +                                          : VLC_TICK_FROM_MS(timeout_ms);
> > +    struct input_preparser_task *task =
> > +        TaskNew(preparser, item, i_options, cbs, cbs_userdata, id, timeout);
> > -    if (background_worker_Push(preparser->worker, req, id, timeout))
> > -        if (req->cbs && cbs->on_preparse_ended)
> > -            cbs->on_preparse_ended(item, ITEM_PREPARSE_FAILED, cbs_userdata);
> > +    PreparserAddTask(preparser, task);
> > -    ReqRelease(req);
> > +    vlc_executor_Submit(preparser->executor, &task->runnable);
> >   }
> >   void input_preparser_fetcher_Push( input_preparser_t *preparser,
> > @@ -308,18 +394,35 @@ void input_preparser_fetcher_Push( input_preparser_t *preparser,
> >   void input_preparser_Cancel( input_preparser_t *preparser, void *id )
> >   {
> > -    background_worker_Cancel( preparser->worker, id );
> > +    vlc_mutex_lock(&preparser->lock);
> > +
> > +    struct input_preparser_task *task;
> > +    vlc_list_foreach(task, &preparser->submitted_tasks, node)
> > +    {
> > +        if (!id || task->id == id)
> > +        {
> > +            bool canceled =
> > +                vlc_executor_Cancel(preparser->executor, &task->runnable);
> > +            if (canceled)
> > +                FinishTask(task);
> > +            else
> > +                /* The task will be finished after run() */
> > +                Interrupt(task);
> > +        }
> > +    }
> > +
> > +    vlc_mutex_unlock(&preparser->lock);
> >   }
> >   void input_preparser_Deactivate( input_preparser_t* preparser )
> >   {
> >       atomic_store( &preparser->deactivated, true );
> > -    background_worker_Cancel( preparser->worker, NULL );
> > +    input_preparser_Cancel(preparser, NULL);
> >   }
> >   void input_preparser_Delete( input_preparser_t *preparser )
> >   {
> > -    background_worker_Delete( preparser->worker );
> > +    vlc_executor_Delete(preparser->executor);
> 
> It seems submitted_tasks is not emptied before release.

_Cancel() is called in input_preparser_Deactivate(), which IMO must be
called (in practice it is) before input_preparser_Delete() in the
current preparser API (otherwise, it would crash).

(I would like not to touch the preparser API.)

> 
> >       if( preparser->fetcher )
> >           input_fetcher_Delete( preparser->fetcher );
> > -- 
> > 2.28.0
> > 
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > https://mailman.videolan.org/listinfo/vlc-devel
> > 
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list