[vlc-devel] [PATCH 4/6] fetcher: use vlc_executor_t
Thomas Guillem
thomas at gllm.fr
Tue Sep 8 13:18:09 CEST 2020
On Tue, Sep 8, 2020, at 12:36, Alexandre Janniaux wrote:
> Hi,
>
> I like the end result very much, though the race condition
> on Cancel is a bit inconvenient. I don't have any better way
> for now anyway.
>
> Just a few comments below:
>
> On Mon, Sep 07, 2020 at 05:40:09PM +0200, Romain Vimont wrote:
> > Replace the background workers by executors.
> > ---
> > src/preparser/fetcher.c | 379 ++++++++++++++++++++++------------------
> > 1 file changed, 207 insertions(+), 172 deletions(-)
> >
> > diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
> > index 5bb4aa1cef..d237ded782 100644
> > --- a/src/preparser/fetcher.c
> > +++ b/src/preparser/fetcher.c
> > @@ -31,43 +31,118 @@
> > #include <vlc_threads.h>
> > #include <vlc_memstream.h>
> > #include <vlc_meta_fetcher.h>
> > +#include <vlc_executor.h>
> >
> > #include "art.h"
> > #include "libvlc.h"
> > #include "fetcher.h"
> > #include "input/input_interface.h"
> > -#include "misc/background_worker.h"
> > #include "misc/interrupt.h"
> >
> > struct input_fetcher_t {
> > - struct background_worker* local;
> > - struct background_worker* network;
> > - struct background_worker* downloader;
> > + vlc_executor_t *executor_local;
> > + vlc_executor_t *executor_network;
> > + vlc_executor_t *executor_downloader;
>
> I'm ok with it, but we probably want to have a single executor
> for all of them in the very end, don't we?
No, you don't want to block local parsing because all threads from the network executor are waiting for I/O.
>
> It's probably a premature notice though, because we probably
> need a lot of refactor before it becomes of any use.
>
> If I'm not correct, maybe this refactor could also add some
> documentation if you had the time to understand why there are
> three of them?
>
> >
> > vlc_dictionary_t album_cache;
> > vlc_object_t* owner;
> > +
> > vlc_mutex_t lock;
> > + struct vlc_list submitted_tasks; /**< list of struct task */
> > };
> >
> > -struct fetcher_request {
> > +struct task {
> > + input_fetcher_t *fetcher;
> > + vlc_executor_t *executor;
> > input_item_t* item;
> > - vlc_atomic_rc_t rc;
> > int options;
> > const input_fetcher_callbacks_t *cbs;
> > void *userdata;
> > +
> > + vlc_interrupt_t interrupt;
> > +
> > + struct vlc_runnable runnable; /**< to be passed to the executor */
> > + struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */
> > };
> >
> > -struct fetcher_thread {
> > - void (*pf_worker)( input_fetcher_t*, struct fetcher_request* );
> > +static void RunDownloader(void *);
> > +static void RunSearchLocal(void *);
> > +static void RunSearchNetwork(void *);
> > +
> > +static struct task *
> > +TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> > + input_item_meta_request_option_t options,
> > + const input_fetcher_callbacks_t *cbs, void *userdata)
> > +{
> > + struct task *task = malloc(sizeof(*task));
> > + if (!task)
> > + return NULL;
> > +
> > + task->fetcher = fetcher;
> > + task->executor = executor;
> > + task->item = item;
> > + task->options = options;
> > + task->cbs = cbs;
> > + task->userdata = userdata;
> >
> > - struct background_worker* worker;
> > - struct fetcher_request* req;
> > - input_fetcher_t* fetcher;
> > + vlc_interrupt_init(&task->interrupt);
> >
> > - vlc_interrupt_t interrupt;
> > - vlc_thread_t thread;
> > - atomic_bool active;
> > -};
> > + input_item_Hold(item);
> > +
> > + if (executor == fetcher->executor_local)
> > + task->runnable.run = RunSearchLocal;
> > + else if (executor == fetcher->executor_network)
> > + task->runnable.run = RunSearchNetwork;
> > + else
> > + {
> > + assert(executor == fetcher->executor_downloader);
> > + task->runnable.run = RunDownloader;
> > + }
> > +
> > + task->runnable.userdata = task;
> > +
> > + return task;
> > +}
> > +
> > +static void
> > +TaskDelete(struct task *task)
> > +{
> > + input_item_Release(task->item);
> > + vlc_interrupt_deinit(&task->interrupt);
> > + free(task);
> > +}
> > +
> > +static void
> > +FetcherAddTask(input_fetcher_t *fetcher, struct task *task)
> > +{
> > + vlc_mutex_lock(&fetcher->lock);
> > + vlc_list_append(&task->node, &fetcher->submitted_tasks);
> > + vlc_mutex_unlock(&fetcher->lock);
> > +}
> > +
> > +static void
> > +FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task)
> > +{
> > + vlc_mutex_lock(&fetcher->lock);
> > + vlc_list_remove(&task->node);
> > + vlc_mutex_unlock(&fetcher->lock);
> > +}
> > +
> > +static int
> > +Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
> > + input_item_meta_request_option_t options,
> > + const input_fetcher_callbacks_t *cbs, void *userdata)
> > +{
> > + struct task *task =
> > + TaskNew(fetcher, executor, item, options, cbs, userdata);
> > + if (!task)
> > + return VLC_ENOMEM;
> > +
> > + FetcherAddTask(fetcher, task);
> > + vlc_executor_Submit(task->executor, &task->runnable);
> > +
> > + return VLC_SUCCESS;
> > +}
> >
> > static char* CreateCacheKey( input_item_t* item )
> > {
> > @@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope)
> > return CheckArt( item );
> > }
> >
> > -static int SearchByScope( input_fetcher_t* fetcher,
> > - struct fetcher_request* req, int scope )
> > +static int SearchByScope(struct task *task, int scope)
> > {
> > - input_item_t* item = req->item;
> > + input_fetcher_t *fetcher = task->fetcher;
> > + input_item_t* item = task->item;
> >
> > if( CheckMeta( item ) &&
> > - InvokeModule( fetcher, req->item, scope, "meta fetcher" ) )
> > + InvokeModule( fetcher, item, scope, "meta fetcher" ) )
> > {
> > return VLC_EGENERIC;
> > }
> > @@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher,
> > ! input_FindArtInCache( item ) ||
> > ! SearchArt( fetcher, item, scope ) )
> > {
> > - AddAlbumCache( fetcher, req->item, false );
> > - if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) )
> > + AddAlbumCache( fetcher, task->item, false );
> > + int ret = Submit(fetcher, fetcher->executor_downloader, item,
> > + task->options, task->cbs, task->userdata);
> > + if (ret == VLC_SUCCESS)
> > return VLC_SUCCESS;
> > }
> >
> > return VLC_EGENERIC;
> > }
> >
> > -static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched )
> > +static void NotifyArtFetchEnded(struct task *task, bool fetched)
> > {
> > - if (req->cbs && req->cbs->on_art_fetch_ended)
> > - req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata);
> > + if (task->cbs && task->cbs->on_art_fetch_ended)
> > + task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata);
> > }
> >
> > -static void Downloader( input_fetcher_t* fetcher,
> > - struct fetcher_request* req )
> > +static void RunDownloader(void *userdata)
> > {
> > - ReadAlbumCache( fetcher, req->item );
> > + struct task *task = userdata;
> > + input_fetcher_t *fetcher = task->fetcher;
> > +
> > + vlc_interrupt_set(&task->interrupt);
> > +
> > + ReadAlbumCache( fetcher, task->item );
> >
> > - char *psz_arturl = input_item_GetArtURL( req->item );
> > + char *psz_arturl = input_item_GetArtURL( task->item );
> > if( !psz_arturl )
> > goto error;
> >
> > @@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher,
> > goto error;
> > }
> >
> > - input_SaveArt( fetcher->owner, req->item, output_stream.ptr,
> > + input_SaveArt( fetcher->owner, task->item, output_stream.ptr,
> > output_stream.length, NULL );
> >
> > free( output_stream.ptr );
> > - AddAlbumCache( fetcher, req->item, true );
> > + AddAlbumCache( fetcher, task->item, true );
> >
> > out:
> > + vlc_interrupt_set(NULL);
> > +
> > if( psz_arturl )
> > {
> > - var_SetAddress( fetcher->owner, "item-change", req->item );
> > - input_item_SetArtFetched( req->item, true );
> > + var_SetAddress( fetcher->owner, "item-change", task->item );
> > + input_item_SetArtFetched( task->item, true );
> > }
> >
> > free( psz_arturl );
> > - NotifyArtFetchEnded(req, psz_arturl != NULL);
> > + NotifyArtFetchEnded(task, psz_arturl != NULL);
> > + FetcherRemoveTask(fetcher, task);
> > + TaskDelete(task);
> > return;
> >
> > error:
> > + vlc_interrupt_set(NULL);
> > +
> > FREENULL( psz_arturl );
> > + NotifyArtFetchEnded(task, false);
> > + FetcherRemoveTask(fetcher, task);
> > + TaskDelete(task);
> > goto out;
> > }
> >
> > -static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req )
> > +static void RunSearchLocal(void *userdata)
> > {
> > - if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> > - return; /* done */
> > + struct task *task = userdata;
> > + input_fetcher_t *fetcher = task->fetcher;
> > +
> > + vlc_interrupt_set(&task->interrupt);
> > +
> > + if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
> > + goto end; /* done */
> >
> > if( var_InheritBool( fetcher->owner, "metadata-network-access" ) ||
> > - req->options & META_REQUEST_OPTION_FETCH_NETWORK )
> > + task->options & META_REQUEST_OPTION_FETCH_NETWORK )
> > {
> > - if( background_worker_Push( fetcher->network, req, NULL, 0 ) )
> > - NotifyArtFetchEnded(req, false);
> > + int ret = Submit(fetcher, fetcher->executor_network, task->item,
> > + task->options, task->cbs, task->userdata);
> > + if (ret != VLC_SUCCESS)
> > + NotifyArtFetchEnded(task, false);
> > }
> > else
> > {
> > - input_item_SetArtNotFound( req->item, true );
> > - NotifyArtFetchEnded(req, false);
> > + input_item_SetArtNotFound( task->item, true );
> > + NotifyArtFetchEnded(task, false);
> > }
> > -}
> >
> > -static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req )
> > -{
> > - if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) )
> > - {
> > - input_item_SetArtNotFound( req->item, true );
> > - NotifyArtFetchEnded(req, false);
> > - }
> > -}
> > -
> > -static void RequestRelease( void* req_ )
> > -{
> > - struct fetcher_request* req = req_;
> > +end:
> > + vlc_interrupt_set(NULL);
> >
> > - if( !vlc_atomic_rc_dec( &req->rc ) )
> > - return;
> > -
> > - input_item_Release( req->item );
> > - free( req );
> > -}
> > -
> > -static void RequestHold( void* req_ )
> > -{
> > - struct fetcher_request* req = req_;
> > - vlc_atomic_rc_inc( &req->rc );
> > -}
> > -
> > -static void* FetcherThread( void* handle )
> > -{
> > - struct fetcher_thread* th = handle;
> > - vlc_interrupt_set( &th->interrupt );
> > -
> > - th->pf_worker( th->fetcher, th->req );
> > -
> > - atomic_store( &th->active, false );
> > - background_worker_RequestProbe( th->worker );
> > - return NULL;
> > + FetcherRemoveTask(fetcher, task);
> > + TaskDelete(task);
> > }
> >
> > -static int StartWorker( input_fetcher_t* fetcher,
> > - void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ),
> > - struct background_worker* bg, struct fetcher_request* req, void** handle )
> > +static void RunSearchNetwork(void *userdata)
> > {
> > - struct fetcher_thread* th = malloc( sizeof *th );
> > -
> > - if( unlikely( !th ) )
> > - return VLC_ENOMEM;
> > -
> > - th->req = req;
> > - th->worker = bg;
> > - th->fetcher = fetcher;
> > - th->pf_worker = pf_worker;
> > + struct task *task = userdata;
> >
> > - vlc_interrupt_init( &th->interrupt );
> > - atomic_init( &th->active, true );
> > + vlc_interrupt_set(&task->interrupt);
> >
> > - if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) )
> > + if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS )
> > {
> > - *handle = th;
> > - return VLC_SUCCESS;
> > + input_item_SetArtNotFound( task->item, true );
> > + NotifyArtFetchEnded(task, false);
> > }
> >
> > - vlc_interrupt_deinit( &th->interrupt );
> > - free( th );
> > - return VLC_EGENERIC;
> > -}
> > + vlc_interrupt_set(NULL);
> >
> > -static int ProbeWorker( void* fetcher_, void* th_ )
> > -{
> > - return !atomic_load( &((struct fetcher_thread*)th_)->active );
> > - VLC_UNUSED( fetcher_ );
> > -}
> > -
> > -static void CloseWorker( void* fetcher_, void* th_ )
> > -{
> > - struct fetcher_thread* th = th_;
> > - VLC_UNUSED( fetcher_ );
> > -
> > - vlc_interrupt_kill( &th->interrupt );
> > - vlc_join( th->thread, NULL );
> > - vlc_interrupt_deinit( &th->interrupt );
> > - free( th );
> > -}
> > -
> > -#define DEF_STARTER(name, worker) \
> > -static int Start ## name( void* fetcher_, void* req_, void** out ) { \
> > - input_fetcher_t* fetcher = fetcher_; \
> > - return StartWorker( fetcher, name, worker, req_, out ); }
> > -
> > -DEF_STARTER( SearchLocal, fetcher->local )
> > -DEF_STARTER(SearchNetwork, fetcher->network )
> > -DEF_STARTER( Downloader, fetcher->downloader )
> > -
> > -static void WorkerInit( input_fetcher_t* fetcher,
> > - struct background_worker** worker, int( *starter )( void*, void*, void** ) )
> > -{
> > - struct background_worker_config conf = {
> > - .default_timeout = 0,
> > - .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
> > - .pf_start = starter,
> > - .pf_probe = ProbeWorker,
> > - .pf_stop = CloseWorker,
> > - .pf_release = RequestRelease,
> > - .pf_hold = RequestHold };
> > -
> > - *worker = background_worker_New( fetcher, &conf );
> > + input_fetcher_t *fetcher = task->fetcher;
> > + FetcherRemoveTask(fetcher, task);
> > + TaskDelete(task);
> > }
> >
> > input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
> > @@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
> > if( unlikely( !fetcher ) )
> > return NULL;
> >
> > - fetcher->owner = owner;
> > -
> > - WorkerInit( fetcher, &fetcher->local, StartSearchLocal );
> > - WorkerInit( fetcher, &fetcher->network, StartSearchNetwork );
> > - WorkerInit( fetcher, &fetcher->downloader, StartDownloader );
> > + int max_threads = var_InheritInteger(owner, "fetch-art-threads");
> > + if (max_threads < 1)
> > + max_threads = 1;
> >
> > - if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) )
> > + fetcher->executor_local = vlc_executor_New(max_threads);
> > + if (!fetcher->executor_local)
> > {
> > - if( fetcher->local )
> > - background_worker_Delete( fetcher->local );
> > -
> > - if( fetcher->network )
> > - background_worker_Delete( fetcher->network );
> > + free(fetcher);
> > + return NULL;
> > + }
> >
> > - if( fetcher->downloader )
> > - background_worker_Delete( fetcher->downloader );
> > + fetcher->executor_network = vlc_executor_New(max_threads);
> > + if (!fetcher->executor_network)
> > + {
> > + vlc_executor_Delete(fetcher->executor_local);
> > + free(fetcher);
> > + return NULL;
> > + }
> >
> > - free( fetcher );
> > + fetcher->executor_downloader = vlc_executor_New(max_threads);
> > + if (!fetcher->executor_downloader)
> > + {
> > + vlc_executor_Delete(fetcher->executor_network);
> > + vlc_executor_Delete(fetcher->executor_local);
> > + free(fetcher);
> > return NULL;
> > }
> >
> > - vlc_mutex_init( &fetcher->lock );
> > + fetcher->owner = owner;
> > +
> > + vlc_mutex_init(&fetcher->lock);
> > + vlc_list_init(&fetcher->submitted_tasks);
> > +
> > vlc_dictionary_init( &fetcher->album_cache, 0 );
> >
> > return fetcher;
> > }
> >
> > -int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item,
> > +int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item,
> > input_item_meta_request_option_t options,
> > - const input_fetcher_callbacks_t *cbs, void *cbs_userdata )
> > + const input_fetcher_callbacks_t *cbs, void *cbs_userdata)
> > {
> > assert(options & META_REQUEST_OPTION_FETCH_ANY);
> > - struct fetcher_request* req = malloc( sizeof *req );
> >
> > - if( unlikely( !req ) )
> > - return VLC_ENOMEM;
> > + vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL
> > + ? fetcher->executor_local
> > + : fetcher->executor_network;
> >
> > - req->item = item;
> > - req->options = options;
> > - req->cbs = cbs;
> > - req->userdata = cbs_userdata;
> > + return Submit(fetcher, executor, item, options, cbs, cbs_userdata);
> > +}
> >
> > - vlc_atomic_rc_init( &req->rc );
> > - input_item_Hold( item );
> > +static void
> > +CancelAllTasks(input_fetcher_t *fetcher)
> > +{
> > + vlc_mutex_lock(&fetcher->lock);
> >
> > - struct background_worker* worker =
> > - options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network;
> > - if( background_worker_Push( worker, req, NULL, 0 ) )
> > - NotifyArtFetchEnded(req, false);
> > + struct task *task;
> > + vlc_list_foreach(task, &fetcher->submitted_tasks, node)
> > + {
> > + bool canceled = vlc_executor_Cancel(task->executor, &task->runnable);
> > + if (canceled)
> > + {
> > + NotifyArtFetchEnded(task, false);
> > + vlc_list_remove(&task->node);
> > + TaskDelete(task);
> > + }
> > + /* Otherwise, the task will be finished after run() */
>
> you could add “finished and destroyed” just to be exhaustive
> and readable.
>
> > + }
> >
> > - RequestRelease( req );
> > - return VLC_SUCCESS;
> > + vlc_mutex_unlock(&fetcher->lock);
> > }
> >
> > void input_fetcher_Delete( input_fetcher_t* fetcher )
> > {
> > - background_worker_Delete( fetcher->local );
> > - background_worker_Delete( fetcher->network );
> > - background_worker_Delete( fetcher->downloader );
> > + CancelAllTasks(fetcher);
> > +
> > + vlc_executor_Delete(fetcher->executor_local);
> > + vlc_executor_Delete(fetcher->executor_network);
> > + vlc_executor_Delete(fetcher->executor_downloader);
> >
> > vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL );
> > free( fetcher );
> > --
> > 2.28.0
> >
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > https://mailman.videolan.org/listinfo/vlc-devel
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel
More information about the vlc-devel
mailing list