[vlc-devel] [PATCH v2 4/8] fetcher: use vlc_executor_t
Romain Vimont
rom1v at videolabs.io
Sun Sep 13 21:11:17 CEST 2020
Replace the background workers by executors.
---
src/preparser/fetcher.c | 379 ++++++++++++++++++++++------------------
1 file changed, 207 insertions(+), 172 deletions(-)
diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
index 5bb4aa1cef..7616078ff8 100644
--- a/src/preparser/fetcher.c
+++ b/src/preparser/fetcher.c
@@ -31,43 +31,118 @@
#include <vlc_threads.h>
#include <vlc_memstream.h>
#include <vlc_meta_fetcher.h>
+#include <vlc_executor.h>
#include "art.h"
#include "libvlc.h"
#include "fetcher.h"
#include "input/input_interface.h"
-#include "misc/background_worker.h"
#include "misc/interrupt.h"
struct input_fetcher_t {
- struct background_worker* local;
- struct background_worker* network;
- struct background_worker* downloader;
+ vlc_executor_t *executor_local;
+ vlc_executor_t *executor_network;
+ vlc_executor_t *executor_downloader;
vlc_dictionary_t album_cache;
vlc_object_t* owner;
+
vlc_mutex_t lock;
+ struct vlc_list submitted_tasks; /**< list of struct task */
};
-struct fetcher_request {
+struct task {
+ input_fetcher_t *fetcher;
+ vlc_executor_t *executor;
input_item_t* item;
- vlc_atomic_rc_t rc;
int options;
const input_fetcher_callbacks_t *cbs;
void *userdata;
+
+ vlc_interrupt_t interrupt;
+
+ struct vlc_runnable runnable; /**< to be passed to the executor */
+ struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */
};
-struct fetcher_thread {
- void (*pf_worker)( input_fetcher_t*, struct fetcher_request* );
+static void RunDownloader(void *);
+static void RunSearchLocal(void *);
+static void RunSearchNetwork(void *);
+
+static struct task *
+TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
+ input_item_meta_request_option_t options,
+ const input_fetcher_callbacks_t *cbs, void *userdata)
+{
+ struct task *task = malloc(sizeof(*task));
+ if (!task)
+ return NULL;
+
+ task->fetcher = fetcher;
+ task->executor = executor;
+ task->item = item;
+ task->options = options;
+ task->cbs = cbs;
+ task->userdata = userdata;
- struct background_worker* worker;
- struct fetcher_request* req;
- input_fetcher_t* fetcher;
+ vlc_interrupt_init(&task->interrupt);
- vlc_interrupt_t interrupt;
- vlc_thread_t thread;
- atomic_bool active;
-};
+ input_item_Hold(item);
+
+ if (executor == fetcher->executor_local)
+ task->runnable.run = RunSearchLocal;
+ else if (executor == fetcher->executor_network)
+ task->runnable.run = RunSearchNetwork;
+ else
+ {
+ assert(executor == fetcher->executor_downloader);
+ task->runnable.run = RunDownloader;
+ }
+
+ task->runnable.userdata = task;
+
+ return task;
+}
+
+static void
+TaskDelete(struct task *task)
+{
+ input_item_Release(task->item);
+ vlc_interrupt_deinit(&task->interrupt);
+ free(task);
+}
+
+static void
+FetcherAddTask(input_fetcher_t *fetcher, struct task *task)
+{
+ vlc_mutex_lock(&fetcher->lock);
+ vlc_list_append(&task->node, &fetcher->submitted_tasks);
+ vlc_mutex_unlock(&fetcher->lock);
+}
+
+static void
+FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task)
+{
+ vlc_mutex_lock(&fetcher->lock);
+ vlc_list_remove(&task->node);
+ vlc_mutex_unlock(&fetcher->lock);
+}
+
+static int
+Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
+ input_item_meta_request_option_t options,
+ const input_fetcher_callbacks_t *cbs, void *userdata)
+{
+ struct task *task =
+ TaskNew(fetcher, executor, item, options, cbs, userdata);
+ if (!task)
+ return VLC_ENOMEM;
+
+ FetcherAddTask(fetcher, task);
+ vlc_executor_Submit(task->executor, &task->runnable);
+
+ return VLC_SUCCESS;
+}
static char* CreateCacheKey( input_item_t* item )
{
@@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope)
return CheckArt( item );
}
-static int SearchByScope( input_fetcher_t* fetcher,
- struct fetcher_request* req, int scope )
+static int SearchByScope(struct task *task, int scope)
{
- input_item_t* item = req->item;
+ input_fetcher_t *fetcher = task->fetcher;
+ input_item_t* item = task->item;
if( CheckMeta( item ) &&
- InvokeModule( fetcher, req->item, scope, "meta fetcher" ) )
+ InvokeModule( fetcher, item, scope, "meta fetcher" ) )
{
return VLC_EGENERIC;
}
@@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher,
! input_FindArtInCache( item ) ||
! SearchArt( fetcher, item, scope ) )
{
- AddAlbumCache( fetcher, req->item, false );
- if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) )
+ AddAlbumCache( fetcher, task->item, false );
+ int ret = Submit(fetcher, fetcher->executor_downloader, item,
+ task->options, task->cbs, task->userdata);
+ if (ret == VLC_SUCCESS)
return VLC_SUCCESS;
}
return VLC_EGENERIC;
}
-static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched )
+static void NotifyArtFetchEnded(struct task *task, bool fetched)
{
- if (req->cbs && req->cbs->on_art_fetch_ended)
- req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata);
+ if (task->cbs && task->cbs->on_art_fetch_ended)
+ task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata);
}
-static void Downloader( input_fetcher_t* fetcher,
- struct fetcher_request* req )
+static void RunDownloader(void *userdata)
{
- ReadAlbumCache( fetcher, req->item );
+ struct task *task = userdata;
+ input_fetcher_t *fetcher = task->fetcher;
+
+ vlc_interrupt_set(&task->interrupt);
+
+ ReadAlbumCache( fetcher, task->item );
- char *psz_arturl = input_item_GetArtURL( req->item );
+ char *psz_arturl = input_item_GetArtURL( task->item );
if( !psz_arturl )
goto error;
@@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher,
goto error;
}
- input_SaveArt( fetcher->owner, req->item, output_stream.ptr,
+ input_SaveArt( fetcher->owner, task->item, output_stream.ptr,
output_stream.length, NULL );
free( output_stream.ptr );
- AddAlbumCache( fetcher, req->item, true );
+ AddAlbumCache( fetcher, task->item, true );
out:
+ vlc_interrupt_set(NULL);
+
if( psz_arturl )
{
- var_SetAddress( fetcher->owner, "item-change", req->item );
- input_item_SetArtFetched( req->item, true );
+ var_SetAddress( fetcher->owner, "item-change", task->item );
+ input_item_SetArtFetched( task->item, true );
}
free( psz_arturl );
- NotifyArtFetchEnded(req, psz_arturl != NULL);
+ NotifyArtFetchEnded(task, psz_arturl != NULL);
+ FetcherRemoveTask(fetcher, task);
+ TaskDelete(task);
return;
error:
+ vlc_interrupt_set(NULL);
+
FREENULL( psz_arturl );
+ NotifyArtFetchEnded(task, false);
+ FetcherRemoveTask(fetcher, task);
+ TaskDelete(task);
goto out;
}
-static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req )
+static void RunSearchLocal(void *userdata)
{
- if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
- return; /* done */
+ struct task *task = userdata;
+ input_fetcher_t *fetcher = task->fetcher;
+
+ vlc_interrupt_set(&task->interrupt);
+
+ if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
+ goto end; /* done */
if( var_InheritBool( fetcher->owner, "metadata-network-access" ) ||
- req->options & META_REQUEST_OPTION_FETCH_NETWORK )
+ task->options & META_REQUEST_OPTION_FETCH_NETWORK )
{
- if( background_worker_Push( fetcher->network, req, NULL, 0 ) )
- NotifyArtFetchEnded(req, false);
+ int ret = Submit(fetcher, fetcher->executor_network, task->item,
+ task->options, task->cbs, task->userdata);
+ if (ret != VLC_SUCCESS)
+ NotifyArtFetchEnded(task, false);
}
else
{
- input_item_SetArtNotFound( req->item, true );
- NotifyArtFetchEnded(req, false);
+ input_item_SetArtNotFound( task->item, true );
+ NotifyArtFetchEnded(task, false);
}
-}
-static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req )
-{
- if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) )
- {
- input_item_SetArtNotFound( req->item, true );
- NotifyArtFetchEnded(req, false);
- }
-}
-
-static void RequestRelease( void* req_ )
-{
- struct fetcher_request* req = req_;
+end:
+ vlc_interrupt_set(NULL);
- if( !vlc_atomic_rc_dec( &req->rc ) )
- return;
-
- input_item_Release( req->item );
- free( req );
-}
-
-static void RequestHold( void* req_ )
-{
- struct fetcher_request* req = req_;
- vlc_atomic_rc_inc( &req->rc );
-}
-
-static void* FetcherThread( void* handle )
-{
- struct fetcher_thread* th = handle;
- vlc_interrupt_set( &th->interrupt );
-
- th->pf_worker( th->fetcher, th->req );
-
- atomic_store( &th->active, false );
- background_worker_RequestProbe( th->worker );
- return NULL;
+ FetcherRemoveTask(fetcher, task);
+ TaskDelete(task);
}
-static int StartWorker( input_fetcher_t* fetcher,
- void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ),
- struct background_worker* bg, struct fetcher_request* req, void** handle )
+static void RunSearchNetwork(void *userdata)
{
- struct fetcher_thread* th = malloc( sizeof *th );
-
- if( unlikely( !th ) )
- return VLC_ENOMEM;
-
- th->req = req;
- th->worker = bg;
- th->fetcher = fetcher;
- th->pf_worker = pf_worker;
+ struct task *task = userdata;
- vlc_interrupt_init( &th->interrupt );
- atomic_init( &th->active, true );
+ vlc_interrupt_set(&task->interrupt);
- if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) )
+ if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS )
{
- *handle = th;
- return VLC_SUCCESS;
+ input_item_SetArtNotFound( task->item, true );
+ NotifyArtFetchEnded(task, false);
}
- vlc_interrupt_deinit( &th->interrupt );
- free( th );
- return VLC_EGENERIC;
-}
+ vlc_interrupt_set(NULL);
-static int ProbeWorker( void* fetcher_, void* th_ )
-{
- return !atomic_load( &((struct fetcher_thread*)th_)->active );
- VLC_UNUSED( fetcher_ );
-}
-
-static void CloseWorker( void* fetcher_, void* th_ )
-{
- struct fetcher_thread* th = th_;
- VLC_UNUSED( fetcher_ );
-
- vlc_interrupt_kill( &th->interrupt );
- vlc_join( th->thread, NULL );
- vlc_interrupt_deinit( &th->interrupt );
- free( th );
-}
-
-#define DEF_STARTER(name, worker) \
-static int Start ## name( void* fetcher_, void* req_, void** out ) { \
- input_fetcher_t* fetcher = fetcher_; \
- return StartWorker( fetcher, name, worker, req_, out ); }
-
-DEF_STARTER( SearchLocal, fetcher->local )
-DEF_STARTER(SearchNetwork, fetcher->network )
-DEF_STARTER( Downloader, fetcher->downloader )
-
-static void WorkerInit( input_fetcher_t* fetcher,
- struct background_worker** worker, int( *starter )( void*, void*, void** ) )
-{
- struct background_worker_config conf = {
- .default_timeout = 0,
- .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
- .pf_start = starter,
- .pf_probe = ProbeWorker,
- .pf_stop = CloseWorker,
- .pf_release = RequestRelease,
- .pf_hold = RequestHold };
-
- *worker = background_worker_New( fetcher, &conf );
+ input_fetcher_t *fetcher = task->fetcher;
+ FetcherRemoveTask(fetcher, task);
+ TaskDelete(task);
}
input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
@@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
if( unlikely( !fetcher ) )
return NULL;
- fetcher->owner = owner;
-
- WorkerInit( fetcher, &fetcher->local, StartSearchLocal );
- WorkerInit( fetcher, &fetcher->network, StartSearchNetwork );
- WorkerInit( fetcher, &fetcher->downloader, StartDownloader );
+ int max_threads = var_InheritInteger(owner, "fetch-art-threads");
+ if (max_threads < 1)
+ max_threads = 1;
- if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) )
+ fetcher->executor_local = vlc_executor_New(max_threads);
+ if (!fetcher->executor_local)
{
- if( fetcher->local )
- background_worker_Delete( fetcher->local );
-
- if( fetcher->network )
- background_worker_Delete( fetcher->network );
+ free(fetcher);
+ return NULL;
+ }
- if( fetcher->downloader )
- background_worker_Delete( fetcher->downloader );
+ fetcher->executor_network = vlc_executor_New(max_threads);
+ if (!fetcher->executor_network)
+ {
+ vlc_executor_Delete(fetcher->executor_local);
+ free(fetcher);
+ return NULL;
+ }
- free( fetcher );
+ fetcher->executor_downloader = vlc_executor_New(max_threads);
+ if (!fetcher->executor_downloader)
+ {
+ vlc_executor_Delete(fetcher->executor_network);
+ vlc_executor_Delete(fetcher->executor_local);
+ free(fetcher);
return NULL;
}
- vlc_mutex_init( &fetcher->lock );
+ fetcher->owner = owner;
+
+ vlc_mutex_init(&fetcher->lock);
+ vlc_list_init(&fetcher->submitted_tasks);
+
vlc_dictionary_init( &fetcher->album_cache, 0 );
return fetcher;
}
-int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item,
+int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item,
input_item_meta_request_option_t options,
- const input_fetcher_callbacks_t *cbs, void *cbs_userdata )
+ const input_fetcher_callbacks_t *cbs, void *cbs_userdata)
{
assert(options & META_REQUEST_OPTION_FETCH_ANY);
- struct fetcher_request* req = malloc( sizeof *req );
- if( unlikely( !req ) )
- return VLC_ENOMEM;
+ vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL
+ ? fetcher->executor_local
+ : fetcher->executor_network;
- req->item = item;
- req->options = options;
- req->cbs = cbs;
- req->userdata = cbs_userdata;
+ return Submit(fetcher, executor, item, options, cbs, cbs_userdata);
+}
- vlc_atomic_rc_init( &req->rc );
- input_item_Hold( item );
+static void
+CancelAllTasks(input_fetcher_t *fetcher)
+{
+ vlc_mutex_lock(&fetcher->lock);
- struct background_worker* worker =
- options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network;
- if( background_worker_Push( worker, req, NULL, 0 ) )
- NotifyArtFetchEnded(req, false);
+ struct task *task;
+ vlc_list_foreach(task, &fetcher->submitted_tasks, node)
+ {
+ bool canceled = vlc_executor_Cancel(task->executor, &task->runnable);
+ if (canceled)
+ {
+ NotifyArtFetchEnded(task, false);
+ vlc_list_remove(&task->node);
+ TaskDelete(task);
+ }
+ /* Otherwise, the task will be finished and destroyed after run() */
+ }
- RequestRelease( req );
- return VLC_SUCCESS;
+ vlc_mutex_unlock(&fetcher->lock);
}
void input_fetcher_Delete( input_fetcher_t* fetcher )
{
- background_worker_Delete( fetcher->local );
- background_worker_Delete( fetcher->network );
- background_worker_Delete( fetcher->downloader );
+ CancelAllTasks(fetcher);
+
+ vlc_executor_Delete(fetcher->executor_local);
+ vlc_executor_Delete(fetcher->executor_network);
+ vlc_executor_Delete(fetcher->executor_downloader);
vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL );
free( fetcher );
--
2.28.0
More information about the vlc-devel
mailing list