[vlc-devel] [PATCH 4/6] fetcher: use vlc_executor_t

Alexandre Janniaux ajanni at videolabs.io
Tue Sep 8 12:36:09 CEST 2020


Hi,

I like the end result very much, though the race condition
on Cancel is a bit inconvenient. I don't have any better way
for now anyway.

Just a few comments below:

On Mon, Sep 07, 2020 at 05:40:09PM +0200, Romain Vimont wrote:
> Replace the background workers by executors.
> ---
>  src/preparser/fetcher.c | 379 ++++++++++++++++++++++------------------
>  1 file changed, 207 insertions(+), 172 deletions(-)
>
> diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
> index 5bb4aa1cef..d237ded782 100644
> --- a/src/preparser/fetcher.c
> +++ b/src/preparser/fetcher.c
> @@ -31,43 +31,118 @@
>  #include <vlc_threads.h>
>  #include <vlc_memstream.h>
>  #include <vlc_meta_fetcher.h>
> +#include <vlc_executor.h>
>
>  #include "art.h"
>  #include "libvlc.h"
>  #include "fetcher.h"
>  #include "input/input_interface.h"
> -#include "misc/background_worker.h"
>  #include "misc/interrupt.h"
>
>  struct input_fetcher_t {
> -    struct background_worker* local;
> -    struct background_worker* network;
> -    struct background_worker* downloader;
> +    vlc_executor_t *executor_local;
> +    vlc_executor_t *executor_network;
> +    vlc_executor_t *executor_downloader;

I'm ok with it, but we probably want to have a single executor
for all of them in the very end, don't we?

It's probably a premature notice though, because we probably
need a lot of refactor before it becomes of any use.

If I'm not correct, maybe this refactor could also add some
documentation if you had the time to understand why there are
three of them?

>
>      vlc_dictionary_t album_cache;
>      vlc_object_t* owner;
> +
>      vlc_mutex_t lock;
> +    struct vlc_list submitted_tasks; /**< list of struct task */
>  };
>
> -struct fetcher_request {
> +struct task {
> +    input_fetcher_t *fetcher;
> +    vlc_executor_t *executor;
>      input_item_t* item;
> -    vlc_atomic_rc_t rc;
>      int options;
>      const input_fetcher_callbacks_t *cbs;
>      void *userdata;
> +
> +    vlc_interrupt_t interrupt;
> +
> +    struct vlc_runnable runnable; /**< to be passed to the executor */
> +    struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */
>  };
>
> -struct fetcher_thread {
> -    void (*pf_worker)( input_fetcher_t*, struct fetcher_request* );
> +static void RunDownloader(void *);
> +static void RunSearchLocal(void *);
> +static void RunSearchNetwork(void *);
> +
> +static struct task *
> +TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> +        input_item_meta_request_option_t options,
> +        const input_fetcher_callbacks_t *cbs, void *userdata)
> +{
> +    struct task *task = malloc(sizeof(*task));
> +    if (!task)
> +        return NULL;
> +
> +    task->fetcher = fetcher;
> +    task->executor = executor;
> +    task->item = item;
> +    task->options = options;
> +    task->cbs = cbs;
> +    task->userdata = userdata;
>
> -    struct background_worker* worker;
> -    struct fetcher_request* req;
> -    input_fetcher_t* fetcher;
> +    vlc_interrupt_init(&task->interrupt);
>
> -    vlc_interrupt_t interrupt;
> -    vlc_thread_t thread;
> -    atomic_bool active;
> -};
> +    input_item_Hold(item);
> +
> +    if (executor == fetcher->executor_local)
> +        task->runnable.run = RunSearchLocal;
> +    else if (executor == fetcher->executor_network)
> +        task->runnable.run = RunSearchNetwork;
> +    else
> +    {
> +        assert(executor == fetcher->executor_downloader);
> +        task->runnable.run = RunDownloader;
> +    }
> +
> +    task->runnable.userdata = task;
> +
> +    return task;
> +}
> +
> +static void
> +TaskDelete(struct task *task)
> +{
> +    input_item_Release(task->item);
> +    vlc_interrupt_deinit(&task->interrupt);
> +    free(task);
> +}
> +
> +static void
> +FetcherAddTask(input_fetcher_t *fetcher, struct task *task)
> +{
> +    vlc_mutex_lock(&fetcher->lock);
> +    vlc_list_append(&task->node, &fetcher->submitted_tasks);
> +    vlc_mutex_unlock(&fetcher->lock);
> +}
> +
> +static void
> +FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task)
> +{
> +    vlc_mutex_lock(&fetcher->lock);
> +    vlc_list_remove(&task->node);
> +    vlc_mutex_unlock(&fetcher->lock);
> +}
> +
> +static int
> +Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> +       input_item_meta_request_option_t options,
> +       const input_fetcher_callbacks_t *cbs, void *userdata)
> +{
> +    struct task *task =
> +        TaskNew(fetcher, executor, item, options, cbs, userdata);
> +    if (!task)
> +        return VLC_ENOMEM;
> +
> +    FetcherAddTask(fetcher, task);
> +    vlc_executor_Submit(task->executor, &task->runnable);
> +
> +    return VLC_SUCCESS;
> +}
>
>  static char* CreateCacheKey( input_item_t* item )
>  {
> @@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope)
>      return CheckArt( item );
>  }
>
> -static int SearchByScope( input_fetcher_t* fetcher,
> -    struct fetcher_request* req, int scope )
> +static int SearchByScope(struct task *task, int scope)
>  {
> -    input_item_t* item = req->item;
> +    input_fetcher_t *fetcher = task->fetcher;
> +    input_item_t* item = task->item;
>
>      if( CheckMeta( item ) &&
> -        InvokeModule( fetcher, req->item, scope, "meta fetcher" ) )
> +        InvokeModule( fetcher, item, scope, "meta fetcher" ) )
>      {
>          return VLC_EGENERIC;
>      }
> @@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher,
>          ! input_FindArtInCache( item )             ||
>          ! SearchArt( fetcher, item, scope ) )
>      {
> -        AddAlbumCache( fetcher, req->item, false );
> -        if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) )
> +        AddAlbumCache( fetcher, task->item, false );
> +        int ret = Submit(fetcher, fetcher->executor_downloader, item,
> +                         task->options, task->cbs, task->userdata);
> +        if (ret == VLC_SUCCESS)
>              return VLC_SUCCESS;
>      }
>
>      return VLC_EGENERIC;
>  }
>
> -static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched )
> +static void NotifyArtFetchEnded(struct task *task, bool fetched)
>  {
> -    if (req->cbs && req->cbs->on_art_fetch_ended)
> -        req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata);
> +    if (task->cbs && task->cbs->on_art_fetch_ended)
> +        task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata);
>  }
>
> -static void Downloader( input_fetcher_t* fetcher,
> -    struct fetcher_request* req )
> +static void RunDownloader(void *userdata)
>  {
> -    ReadAlbumCache( fetcher, req->item );
> +    struct task *task = userdata;
> +    input_fetcher_t *fetcher = task->fetcher;
> +
> +    vlc_interrupt_set(&task->interrupt);
> +
> +    ReadAlbumCache( fetcher, task->item );
>
> -    char *psz_arturl = input_item_GetArtURL( req->item );
> +    char *psz_arturl = input_item_GetArtURL( task->item );
>      if( !psz_arturl )
>          goto error;
>
> @@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher,
>          goto error;
>      }
>
> -    input_SaveArt( fetcher->owner, req->item, output_stream.ptr,
> +    input_SaveArt( fetcher->owner, task->item, output_stream.ptr,
>                     output_stream.length, NULL );
>
>      free( output_stream.ptr );
> -    AddAlbumCache( fetcher, req->item, true );
> +    AddAlbumCache( fetcher, task->item, true );
>
>  out:
> +    vlc_interrupt_set(NULL);
> +
>      if( psz_arturl )
>      {
> -        var_SetAddress( fetcher->owner, "item-change", req->item );
> -        input_item_SetArtFetched( req->item, true );
> +        var_SetAddress( fetcher->owner, "item-change", task->item );
> +        input_item_SetArtFetched( task->item, true );
>      }
>
>      free( psz_arturl );
> -    NotifyArtFetchEnded(req, psz_arturl != NULL);
> +    NotifyArtFetchEnded(task, psz_arturl != NULL);
> +    FetcherRemoveTask(fetcher, task);
> +    TaskDelete(task);
>      return;
>
>  error:
> +    vlc_interrupt_set(NULL);
> +
>      FREENULL( psz_arturl );
> +    NotifyArtFetchEnded(task, false);
> +    FetcherRemoveTask(fetcher, task);
> +    TaskDelete(task);
>      goto out;
>  }
>
> -static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req )
> +static void RunSearchLocal(void *userdata)
>  {
> -    if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> -        return; /* done */
> +    struct task *task = userdata;
> +    input_fetcher_t *fetcher = task->fetcher;
> +
> +    vlc_interrupt_set(&task->interrupt);
> +
> +    if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> +        goto end; /* done */
>
>      if( var_InheritBool( fetcher->owner, "metadata-network-access" ) ||
> -        req->options & META_REQUEST_OPTION_FETCH_NETWORK )
> +        task->options & META_REQUEST_OPTION_FETCH_NETWORK )
>      {
> -        if( background_worker_Push( fetcher->network, req, NULL, 0 ) )
> -            NotifyArtFetchEnded(req, false);
> +        int ret = Submit(fetcher, fetcher->executor_network, task->item,
> +                         task->options, task->cbs, task->userdata);
> +        if (ret != VLC_SUCCESS)
> +            NotifyArtFetchEnded(task, false);
>      }
>      else
>      {
> -        input_item_SetArtNotFound( req->item, true );
> -        NotifyArtFetchEnded(req, false);
> +        input_item_SetArtNotFound( task->item, true );
> +        NotifyArtFetchEnded(task, false);
>      }
> -}
>
> -static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req )
> -{
> -    if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) )
> -    {
> -        input_item_SetArtNotFound( req->item, true );
> -        NotifyArtFetchEnded(req, false);
> -    }
> -}
> -
> -static void RequestRelease( void* req_ )
> -{
> -    struct fetcher_request* req = req_;
> +end:
> +    vlc_interrupt_set(NULL);
>
> -    if( !vlc_atomic_rc_dec( &req->rc ) )
> -        return;
> -
> -    input_item_Release( req->item );
> -    free( req );
> -}
> -
> -static void RequestHold( void* req_ )
> -{
> -    struct fetcher_request* req = req_;
> -    vlc_atomic_rc_inc( &req->rc );
> -}
> -
> -static void* FetcherThread( void* handle )
> -{
> -    struct fetcher_thread* th = handle;
> -    vlc_interrupt_set( &th->interrupt );
> -
> -    th->pf_worker( th->fetcher, th->req );
> -
> -    atomic_store( &th->active, false );
> -    background_worker_RequestProbe( th->worker );
> -    return NULL;
> +    FetcherRemoveTask(fetcher, task);
> +    TaskDelete(task);
>  }
>
> -static int StartWorker( input_fetcher_t* fetcher,
> -    void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ),
> -    struct background_worker* bg, struct fetcher_request* req, void** handle )
> +static void RunSearchNetwork(void *userdata)
>  {
> -    struct fetcher_thread* th = malloc( sizeof *th );
> -
> -    if( unlikely( !th ) )
> -        return VLC_ENOMEM;
> -
> -    th->req = req;
> -    th->worker = bg;
> -    th->fetcher = fetcher;
> -    th->pf_worker = pf_worker;
> +    struct task *task = userdata;
>
> -    vlc_interrupt_init( &th->interrupt );
> -    atomic_init( &th->active, true );
> +    vlc_interrupt_set(&task->interrupt);
>
> -    if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) )
> +    if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS )
>      {
> -        *handle = th;
> -        return VLC_SUCCESS;
> +        input_item_SetArtNotFound( task->item, true );
> +        NotifyArtFetchEnded(task, false);
>      }
>
> -    vlc_interrupt_deinit( &th->interrupt );
> -    free( th );
> -    return VLC_EGENERIC;
> -}
> +    vlc_interrupt_set(NULL);
>
> -static int ProbeWorker( void* fetcher_, void* th_ )
> -{
> -    return !atomic_load( &((struct fetcher_thread*)th_)->active );
> -    VLC_UNUSED( fetcher_ );
> -}
> -
> -static void CloseWorker( void* fetcher_, void* th_ )
> -{
> -    struct fetcher_thread* th = th_;
> -    VLC_UNUSED( fetcher_ );
> -
> -    vlc_interrupt_kill( &th->interrupt );
> -    vlc_join( th->thread, NULL );
> -    vlc_interrupt_deinit( &th->interrupt );
> -    free( th );
> -}
> -
> -#define DEF_STARTER(name, worker) \
> -static int Start ## name( void* fetcher_, void* req_, void** out ) { \
> -    input_fetcher_t* fetcher = fetcher_; \
> -    return StartWorker( fetcher, name, worker, req_, out ); }
> -
> -DEF_STARTER(  SearchLocal, fetcher->local )
> -DEF_STARTER(SearchNetwork, fetcher->network )
> -DEF_STARTER(   Downloader, fetcher->downloader )
> -
> -static void WorkerInit( input_fetcher_t* fetcher,
> -    struct background_worker** worker, int( *starter )( void*, void*, void** ) )
> -{
> -    struct background_worker_config conf = {
> -        .default_timeout = 0,
> -        .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
> -        .pf_start = starter,
> -        .pf_probe = ProbeWorker,
> -        .pf_stop = CloseWorker,
> -        .pf_release = RequestRelease,
> -        .pf_hold = RequestHold };
> -
> -    *worker = background_worker_New( fetcher, &conf );
> +    input_fetcher_t *fetcher = task->fetcher;
> +    FetcherRemoveTask(fetcher, task);
> +    TaskDelete(task);
>  }
>
>  input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
> @@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
>      if( unlikely( !fetcher ) )
>          return NULL;
>
> -    fetcher->owner = owner;
> -
> -    WorkerInit( fetcher, &fetcher->local, StartSearchLocal );
> -    WorkerInit( fetcher, &fetcher->network, StartSearchNetwork );
> -    WorkerInit( fetcher, &fetcher->downloader, StartDownloader );
> +    int max_threads = var_InheritInteger(owner, "fetch-art-threads");
> +    if (max_threads < 1)
> +        max_threads = 1;
>
> -    if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) )
> +    fetcher->executor_local = vlc_executor_New(max_threads);
> +    if (!fetcher->executor_local)
>      {
> -        if( fetcher->local )
> -            background_worker_Delete( fetcher->local );
> -
> -        if( fetcher->network )
> -            background_worker_Delete( fetcher->network );
> +        free(fetcher);
> +        return NULL;
> +    }
>
> -        if( fetcher->downloader )
> -            background_worker_Delete( fetcher->downloader );
> +    fetcher->executor_network = vlc_executor_New(max_threads);
> +    if (!fetcher->executor_network)
> +    {
> +        vlc_executor_Delete(fetcher->executor_local);
> +        free(fetcher);
> +        return NULL;
> +    }
>
> -        free( fetcher );
> +    fetcher->executor_downloader = vlc_executor_New(max_threads);
> +    if (!fetcher->executor_downloader)
> +    {
> +        vlc_executor_Delete(fetcher->executor_network);
> +        vlc_executor_Delete(fetcher->executor_local);
> +        free(fetcher);
>          return NULL;
>      }
>
> -    vlc_mutex_init( &fetcher->lock );
> +    fetcher->owner = owner;
> +
> +    vlc_mutex_init(&fetcher->lock);
> +    vlc_list_init(&fetcher->submitted_tasks);
> +
>      vlc_dictionary_init( &fetcher->album_cache, 0 );
>
>      return fetcher;
>  }
>
> -int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item,
> +int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item,
>      input_item_meta_request_option_t options,
> -    const input_fetcher_callbacks_t *cbs, void *cbs_userdata )
> +    const input_fetcher_callbacks_t *cbs, void *cbs_userdata)
>  {
>      assert(options & META_REQUEST_OPTION_FETCH_ANY);
> -    struct fetcher_request* req = malloc( sizeof *req );
>
> -    if( unlikely( !req ) )
> -        return VLC_ENOMEM;
> +    vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL
> +                             ? fetcher->executor_local
> +                             : fetcher->executor_network;
>
> -    req->item = item;
> -    req->options = options;
> -    req->cbs = cbs;
> -    req->userdata = cbs_userdata;
> +    return Submit(fetcher, executor, item, options, cbs, cbs_userdata);
> +}
>
> -    vlc_atomic_rc_init( &req->rc );
> -    input_item_Hold( item );
> +static void
> +CancelAllTasks(input_fetcher_t *fetcher)
> +{
> +    vlc_mutex_lock(&fetcher->lock);
>
> -    struct background_worker* worker =
> -        options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network;
> -    if( background_worker_Push( worker, req, NULL, 0 ) )
> -        NotifyArtFetchEnded(req, false);
> +    struct task *task;
> +    vlc_list_foreach(task, &fetcher->submitted_tasks, node)
> +    {
> +        bool canceled = vlc_executor_Cancel(task->executor, &task->runnable);
> +        if (canceled)
> +        {
> +            NotifyArtFetchEnded(task, false);
> +            vlc_list_remove(&task->node);
> +            TaskDelete(task);
> +        }
> +        /* Otherwise, the task will be finished after run() */

you could add “finished and destroyed” just to be exhaustive
and readable.

> +    }
>
> -    RequestRelease( req );
> -    return VLC_SUCCESS;
> +    vlc_mutex_unlock(&fetcher->lock);
>  }
>
>  void input_fetcher_Delete( input_fetcher_t* fetcher )
>  {
> -    background_worker_Delete( fetcher->local );
> -    background_worker_Delete( fetcher->network );
> -    background_worker_Delete( fetcher->downloader );
> +    CancelAllTasks(fetcher);
> +
> +    vlc_executor_Delete(fetcher->executor_local);
> +    vlc_executor_Delete(fetcher->executor_network);
> +    vlc_executor_Delete(fetcher->executor_downloader);
>
>      vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL );
>      free( fetcher );
> --
> 2.28.0
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list