[vlc-devel] [PATCH 4/6] fetcher: use vlc_executor_t

Thomas Guillem thomas at gllm.fr
Tue Sep 8 13:18:09 CEST 2020



On Tue, Sep 8, 2020, at 12:36, Alexandre Janniaux wrote:
> Hi,
> 
> I like the end result very much, though the race condition
> on Cancel is a bit inconvenient. I don't have any better way
> for now anyway.
> 
> Just a few comments below:
> 
> On Mon, Sep 07, 2020 at 05:40:09PM +0200, Romain Vimont wrote:
> > Replace the background workers by executors.
> > ---
> >  src/preparser/fetcher.c | 379 ++++++++++++++++++++++------------------
> >  1 file changed, 207 insertions(+), 172 deletions(-)
> >
> > diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
> > index 5bb4aa1cef..d237ded782 100644
> > --- a/src/preparser/fetcher.c
> > +++ b/src/preparser/fetcher.c
> > @@ -31,43 +31,118 @@
> >  #include <vlc_threads.h>
> >  #include <vlc_memstream.h>
> >  #include <vlc_meta_fetcher.h>
> > +#include <vlc_executor.h>
> >
> >  #include "art.h"
> >  #include "libvlc.h"
> >  #include "fetcher.h"
> >  #include "input/input_interface.h"
> > -#include "misc/background_worker.h"
> >  #include "misc/interrupt.h"
> >
> >  struct input_fetcher_t {
> > -    struct background_worker* local;
> > -    struct background_worker* network;
> > -    struct background_worker* downloader;
> > +    vlc_executor_t *executor_local;
> > +    vlc_executor_t *executor_network;
> > +    vlc_executor_t *executor_downloader;
> 
> I'm ok with it, but we probably want to have a single executor
> for all of them in the very end, don't we?

No, you don't want to block local parsing because all threads from the network executor are waiting for I/O.

> 
> It's probably a premature notice though, because we probably
> need a lot of refactor before it becomes of any use.
> 
> If I'm not correct, maybe this refactor could also add some
> documentation if you had the time to understand why there are
> three of them?
> 
> >
> >      vlc_dictionary_t album_cache;
> >      vlc_object_t* owner;
> > +
> >      vlc_mutex_t lock;
> > +    struct vlc_list submitted_tasks; /**< list of struct task */
> >  };
> >
> > -struct fetcher_request {
> > +struct task {
> > +    input_fetcher_t *fetcher;
> > +    vlc_executor_t *executor;
> >      input_item_t* item;
> > -    vlc_atomic_rc_t rc;
> >      int options;
> >      const input_fetcher_callbacks_t *cbs;
> >      void *userdata;
> > +
> > +    vlc_interrupt_t interrupt;
> > +
> > +    struct vlc_runnable runnable; /**< to be passed to the executor */
> > +    struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */
> >  };
> >
> > -struct fetcher_thread {
> > -    void (*pf_worker)( input_fetcher_t*, struct fetcher_request* );
> > +static void RunDownloader(void *);
> > +static void RunSearchLocal(void *);
> > +static void RunSearchNetwork(void *);
> > +
> > +static struct task *
> > +TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> > +        input_item_meta_request_option_t options,
> > +        const input_fetcher_callbacks_t *cbs, void *userdata)
> > +{
> > +    struct task *task = malloc(sizeof(*task));
> > +    if (!task)
> > +        return NULL;
> > +
> > +    task->fetcher = fetcher;
> > +    task->executor = executor;
> > +    task->item = item;
> > +    task->options = options;
> > +    task->cbs = cbs;
> > +    task->userdata = userdata;
> >
> > -    struct background_worker* worker;
> > -    struct fetcher_request* req;
> > -    input_fetcher_t* fetcher;
> > +    vlc_interrupt_init(&task->interrupt);
> >
> > -    vlc_interrupt_t interrupt;
> > -    vlc_thread_t thread;
> > -    atomic_bool active;
> > -};
> > +    input_item_Hold(item);
> > +
> > +    if (executor == fetcher->executor_local)
> > +        task->runnable.run = RunSearchLocal;
> > +    else if (executor == fetcher->executor_network)
> > +        task->runnable.run = RunSearchNetwork;
> > +    else
> > +    {
> > +        assert(executor == fetcher->executor_downloader);
> > +        task->runnable.run = RunDownloader;
> > +    }
> > +
> > +    task->runnable.userdata = task;
> > +
> > +    return task;
> > +}
> > +
> > +static void
> > +TaskDelete(struct task *task)
> > +{
> > +    input_item_Release(task->item);
> > +    vlc_interrupt_deinit(&task->interrupt);
> > +    free(task);
> > +}
> > +
> > +static void
> > +FetcherAddTask(input_fetcher_t *fetcher, struct task *task)
> > +{
> > +    vlc_mutex_lock(&fetcher->lock);
> > +    vlc_list_append(&task->node, &fetcher->submitted_tasks);
> > +    vlc_mutex_unlock(&fetcher->lock);
> > +}
> > +
> > +static void
> > +FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task)
> > +{
> > +    vlc_mutex_lock(&fetcher->lock);
> > +    vlc_list_remove(&task->node);
> > +    vlc_mutex_unlock(&fetcher->lock);
> > +}
> > +
> > +static int
> > +Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> > +       input_item_meta_request_option_t options,
> > +       const input_fetcher_callbacks_t *cbs, void *userdata)
> > +{
> > +    struct task *task =
> > +        TaskNew(fetcher, executor, item, options, cbs, userdata);
> > +    if (!task)
> > +        return VLC_ENOMEM;
> > +
> > +    FetcherAddTask(fetcher, task);
> > +    vlc_executor_Submit(task->executor, &task->runnable);
> > +
> > +    return VLC_SUCCESS;
> > +}
> >
> >  static char* CreateCacheKey( input_item_t* item )
> >  {
> > @@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope)
> >      return CheckArt( item );
> >  }
> >
> > -static int SearchByScope( input_fetcher_t* fetcher,
> > -    struct fetcher_request* req, int scope )
> > +static int SearchByScope(struct task *task, int scope)
> >  {
> > -    input_item_t* item = req->item;
> > +    input_fetcher_t *fetcher = task->fetcher;
> > +    input_item_t* item = task->item;
> >
> >      if( CheckMeta( item ) &&
> > -        InvokeModule( fetcher, req->item, scope, "meta fetcher" ) )
> > +        InvokeModule( fetcher, item, scope, "meta fetcher" ) )
> >      {
> >          return VLC_EGENERIC;
> >      }
> > @@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher,
> >          ! input_FindArtInCache( item )             ||
> >          ! SearchArt( fetcher, item, scope ) )
> >      {
> > -        AddAlbumCache( fetcher, req->item, false );
> > -        if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) )
> > +        AddAlbumCache( fetcher, task->item, false );
> > +        int ret = Submit(fetcher, fetcher->executor_downloader, item,
> > +                         task->options, task->cbs, task->userdata);
> > +        if (ret == VLC_SUCCESS)
> >              return VLC_SUCCESS;
> >      }
> >
> >      return VLC_EGENERIC;
> >  }
> >
> > -static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched )
> > +static void NotifyArtFetchEnded(struct task *task, bool fetched)
> >  {
> > -    if (req->cbs && req->cbs->on_art_fetch_ended)
> > -        req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata);
> > +    if (task->cbs && task->cbs->on_art_fetch_ended)
> > +        task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata);
> >  }
> >
> > -static void Downloader( input_fetcher_t* fetcher,
> > -    struct fetcher_request* req )
> > +static void RunDownloader(void *userdata)
> >  {
> > -    ReadAlbumCache( fetcher, req->item );
> > +    struct task *task = userdata;
> > +    input_fetcher_t *fetcher = task->fetcher;
> > +
> > +    vlc_interrupt_set(&task->interrupt);
> > +
> > +    ReadAlbumCache( fetcher, task->item );
> >
> > -    char *psz_arturl = input_item_GetArtURL( req->item );
> > +    char *psz_arturl = input_item_GetArtURL( task->item );
> >      if( !psz_arturl )
> >          goto error;
> >
> > @@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher,
> >          goto error;
> >      }
> >
> > -    input_SaveArt( fetcher->owner, req->item, output_stream.ptr,
> > +    input_SaveArt( fetcher->owner, task->item, output_stream.ptr,
> >                     output_stream.length, NULL );
> >
> >      free( output_stream.ptr );
> > -    AddAlbumCache( fetcher, req->item, true );
> > +    AddAlbumCache( fetcher, task->item, true );
> >
> >  out:
> > +    vlc_interrupt_set(NULL);
> > +
> >      if( psz_arturl )
> >      {
> > -        var_SetAddress( fetcher->owner, "item-change", req->item );
> > -        input_item_SetArtFetched( req->item, true );
> > +        var_SetAddress( fetcher->owner, "item-change", task->item );
> > +        input_item_SetArtFetched( task->item, true );
> >      }
> >
> >      free( psz_arturl );
> > -    NotifyArtFetchEnded(req, psz_arturl != NULL);
> > +    NotifyArtFetchEnded(task, psz_arturl != NULL);
> > +    FetcherRemoveTask(fetcher, task);
> > +    TaskDelete(task);
> >      return;
> >
> >  error:
> > +    vlc_interrupt_set(NULL);
> > +
> >      FREENULL( psz_arturl );
> > +    NotifyArtFetchEnded(task, false);
> > +    FetcherRemoveTask(fetcher, task);
> > +    TaskDelete(task);
> >      goto out;
> >  }
> >
> > -static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req )
> > +static void RunSearchLocal(void *userdata)
> >  {
> > -    if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> > -        return; /* done */
> > +    struct task *task = userdata;
> > +    input_fetcher_t *fetcher = task->fetcher;
> > +
> > +    vlc_interrupt_set(&task->interrupt);
> > +
> > +    if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> > +        goto end; /* done */
> >
> >      if( var_InheritBool( fetcher->owner, "metadata-network-access" ) ||
> > -        req->options & META_REQUEST_OPTION_FETCH_NETWORK )
> > +        task->options & META_REQUEST_OPTION_FETCH_NETWORK )
> >      {
> > -        if( background_worker_Push( fetcher->network, req, NULL, 0 ) )
> > -            NotifyArtFetchEnded(req, false);
> > +        int ret = Submit(fetcher, fetcher->executor_network, task->item,
> > +                         task->options, task->cbs, task->userdata);
> > +        if (ret != VLC_SUCCESS)
> > +            NotifyArtFetchEnded(task, false);
> >      }
> >      else
> >      {
> > -        input_item_SetArtNotFound( req->item, true );
> > -        NotifyArtFetchEnded(req, false);
> > +        input_item_SetArtNotFound( task->item, true );
> > +        NotifyArtFetchEnded(task, false);
> >      }
> > -}
> >
> > -static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req )
> > -{
> > -    if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) )
> > -    {
> > -        input_item_SetArtNotFound( req->item, true );
> > -        NotifyArtFetchEnded(req, false);
> > -    }
> > -}
> > -
> > -static void RequestRelease( void* req_ )
> > -{
> > -    struct fetcher_request* req = req_;
> > +end:
> > +    vlc_interrupt_set(NULL);
> >
> > -    if( !vlc_atomic_rc_dec( &req->rc ) )
> > -        return;
> > -
> > -    input_item_Release( req->item );
> > -    free( req );
> > -}
> > -
> > -static void RequestHold( void* req_ )
> > -{
> > -    struct fetcher_request* req = req_;
> > -    vlc_atomic_rc_inc( &req->rc );
> > -}
> > -
> > -static void* FetcherThread( void* handle )
> > -{
> > -    struct fetcher_thread* th = handle;
> > -    vlc_interrupt_set( &th->interrupt );
> > -
> > -    th->pf_worker( th->fetcher, th->req );
> > -
> > -    atomic_store( &th->active, false );
> > -    background_worker_RequestProbe( th->worker );
> > -    return NULL;
> > +    FetcherRemoveTask(fetcher, task);
> > +    TaskDelete(task);
> >  }
> >
> > -static int StartWorker( input_fetcher_t* fetcher,
> > -    void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ),
> > -    struct background_worker* bg, struct fetcher_request* req, void** handle )
> > +static void RunSearchNetwork(void *userdata)
> >  {
> > -    struct fetcher_thread* th = malloc( sizeof *th );
> > -
> > -    if( unlikely( !th ) )
> > -        return VLC_ENOMEM;
> > -
> > -    th->req = req;
> > -    th->worker = bg;
> > -    th->fetcher = fetcher;
> > -    th->pf_worker = pf_worker;
> > +    struct task *task = userdata;
> >
> > -    vlc_interrupt_init( &th->interrupt );
> > -    atomic_init( &th->active, true );
> > +    vlc_interrupt_set(&task->interrupt);
> >
> > -    if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) )
> > +    if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS )
> >      {
> > -        *handle = th;
> > -        return VLC_SUCCESS;
> > +        input_item_SetArtNotFound( task->item, true );
> > +        NotifyArtFetchEnded(task, false);
> >      }
> >
> > -    vlc_interrupt_deinit( &th->interrupt );
> > -    free( th );
> > -    return VLC_EGENERIC;
> > -}
> > +    vlc_interrupt_set(NULL);
> >
> > -static int ProbeWorker( void* fetcher_, void* th_ )
> > -{
> > -    return !atomic_load( &((struct fetcher_thread*)th_)->active );
> > -    VLC_UNUSED( fetcher_ );
> > -}
> > -
> > -static void CloseWorker( void* fetcher_, void* th_ )
> > -{
> > -    struct fetcher_thread* th = th_;
> > -    VLC_UNUSED( fetcher_ );
> > -
> > -    vlc_interrupt_kill( &th->interrupt );
> > -    vlc_join( th->thread, NULL );
> > -    vlc_interrupt_deinit( &th->interrupt );
> > -    free( th );
> > -}
> > -
> > -#define DEF_STARTER(name, worker) \
> > -static int Start ## name( void* fetcher_, void* req_, void** out ) { \
> > -    input_fetcher_t* fetcher = fetcher_; \
> > -    return StartWorker( fetcher, name, worker, req_, out ); }
> > -
> > -DEF_STARTER(  SearchLocal, fetcher->local )
> > -DEF_STARTER(SearchNetwork, fetcher->network )
> > -DEF_STARTER(   Downloader, fetcher->downloader )
> > -
> > -static void WorkerInit( input_fetcher_t* fetcher,
> > -    struct background_worker** worker, int( *starter )( void*, void*, void** ) )
> > -{
> > -    struct background_worker_config conf = {
> > -        .default_timeout = 0,
> > -        .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
> > -        .pf_start = starter,
> > -        .pf_probe = ProbeWorker,
> > -        .pf_stop = CloseWorker,
> > -        .pf_release = RequestRelease,
> > -        .pf_hold = RequestHold };
> > -
> > -    *worker = background_worker_New( fetcher, &conf );
> > +    input_fetcher_t *fetcher = task->fetcher;
> > +    FetcherRemoveTask(fetcher, task);
> > +    TaskDelete(task);
> >  }
> >
> >  input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
> > @@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
> >      if( unlikely( !fetcher ) )
> >          return NULL;
> >
> > -    fetcher->owner = owner;
> > -
> > -    WorkerInit( fetcher, &fetcher->local, StartSearchLocal );
> > -    WorkerInit( fetcher, &fetcher->network, StartSearchNetwork );
> > -    WorkerInit( fetcher, &fetcher->downloader, StartDownloader );
> > +    int max_threads = var_InheritInteger(owner, "fetch-art-threads");
> > +    if (max_threads < 1)
> > +        max_threads = 1;
> >
> > -    if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) )
> > +    fetcher->executor_local = vlc_executor_New(max_threads);
> > +    if (!fetcher->executor_local)
> >      {
> > -        if( fetcher->local )
> > -            background_worker_Delete( fetcher->local );
> > -
> > -        if( fetcher->network )
> > -            background_worker_Delete( fetcher->network );
> > +        free(fetcher);
> > +        return NULL;
> > +    }
> >
> > -        if( fetcher->downloader )
> > -            background_worker_Delete( fetcher->downloader );
> > +    fetcher->executor_network = vlc_executor_New(max_threads);
> > +    if (!fetcher->executor_network)
> > +    {
> > +        vlc_executor_Delete(fetcher->executor_local);
> > +        free(fetcher);
> > +        return NULL;
> > +    }
> >
> > -        free( fetcher );
> > +    fetcher->executor_downloader = vlc_executor_New(max_threads);
> > +    if (!fetcher->executor_downloader)
> > +    {
> > +        vlc_executor_Delete(fetcher->executor_network);
> > +        vlc_executor_Delete(fetcher->executor_local);
> > +        free(fetcher);
> >          return NULL;
> >      }
> >
> > -    vlc_mutex_init( &fetcher->lock );
> > +    fetcher->owner = owner;
> > +
> > +    vlc_mutex_init(&fetcher->lock);
> > +    vlc_list_init(&fetcher->submitted_tasks);
> > +
> >      vlc_dictionary_init( &fetcher->album_cache, 0 );
> >
> >      return fetcher;
> >  }
> >
> > -int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item,
> > +int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item,
> >      input_item_meta_request_option_t options,
> > -    const input_fetcher_callbacks_t *cbs, void *cbs_userdata )
> > +    const input_fetcher_callbacks_t *cbs, void *cbs_userdata)
> >  {
> >      assert(options & META_REQUEST_OPTION_FETCH_ANY);
> > -    struct fetcher_request* req = malloc( sizeof *req );
> >
> > -    if( unlikely( !req ) )
> > -        return VLC_ENOMEM;
> > +    vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL
> > +                             ? fetcher->executor_local
> > +                             : fetcher->executor_network;
> >
> > -    req->item = item;
> > -    req->options = options;
> > -    req->cbs = cbs;
> > -    req->userdata = cbs_userdata;
> > +    return Submit(fetcher, executor, item, options, cbs, cbs_userdata);
> > +}
> >
> > -    vlc_atomic_rc_init( &req->rc );
> > -    input_item_Hold( item );
> > +static void
> > +CancelAllTasks(input_fetcher_t *fetcher)
> > +{
> > +    vlc_mutex_lock(&fetcher->lock);
> >
> > -    struct background_worker* worker =
> > -        options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network;
> > -    if( background_worker_Push( worker, req, NULL, 0 ) )
> > -        NotifyArtFetchEnded(req, false);
> > +    struct task *task;
> > +    vlc_list_foreach(task, &fetcher->submitted_tasks, node)
> > +    {
> > +        bool canceled = vlc_executor_Cancel(task->executor, &task->runnable);
> > +        if (canceled)
> > +        {
> > +            NotifyArtFetchEnded(task, false);
> > +            vlc_list_remove(&task->node);
> > +            TaskDelete(task);
> > +        }
> > +        /* Otherwise, the task will be finished after run() */
> 
> you could add “finished and destroyed” just to be exhaustive
> and readable.
> 
> > +    }
> >
> > -    RequestRelease( req );
> > -    return VLC_SUCCESS;
> > +    vlc_mutex_unlock(&fetcher->lock);
> >  }
> >
> >  void input_fetcher_Delete( input_fetcher_t* fetcher )
> >  {
> > -    background_worker_Delete( fetcher->local );
> > -    background_worker_Delete( fetcher->network );
> > -    background_worker_Delete( fetcher->downloader );
> > +    CancelAllTasks(fetcher);
> > +
> > +    vlc_executor_Delete(fetcher->executor_local);
> > +    vlc_executor_Delete(fetcher->executor_network);
> > +    vlc_executor_Delete(fetcher->executor_downloader);
> >
> >      vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL );
> >      free( fetcher );
> > --
> > 2.28.0
> >
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > https://mailman.videolan.org/listinfo/vlc-devel
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list