[vlc-devel] [PATCH 4/6] fetcher: use vlc_executor_t

Romain Vimont rom1v at videolabs.io
Mon Sep 7 17:40:09 CEST 2020


Replace the background workers by executors.
---
 src/preparser/fetcher.c | 379 ++++++++++++++++++++++------------------
 1 file changed, 207 insertions(+), 172 deletions(-)

diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
index 5bb4aa1cef..d237ded782 100644
--- a/src/preparser/fetcher.c
+++ b/src/preparser/fetcher.c
@@ -31,43 +31,118 @@
 #include <vlc_threads.h>
 #include <vlc_memstream.h>
 #include <vlc_meta_fetcher.h>
+#include <vlc_executor.h>
 
 #include "art.h"
 #include "libvlc.h"
 #include "fetcher.h"
 #include "input/input_interface.h"
-#include "misc/background_worker.h"
 #include "misc/interrupt.h"
 
 struct input_fetcher_t {
-    struct background_worker* local;
-    struct background_worker* network;
-    struct background_worker* downloader;
+    vlc_executor_t *executor_local;
+    vlc_executor_t *executor_network;
+    vlc_executor_t *executor_downloader;
 
     vlc_dictionary_t album_cache;
     vlc_object_t* owner;
+
     vlc_mutex_t lock;
+    struct vlc_list submitted_tasks; /**< list of struct task */
 };
 
-struct fetcher_request {
+struct task {
+    input_fetcher_t *fetcher;
+    vlc_executor_t *executor;
     input_item_t* item;
-    vlc_atomic_rc_t rc;
     int options;
     const input_fetcher_callbacks_t *cbs;
     void *userdata;
+
+    vlc_interrupt_t interrupt;
+
+    struct vlc_runnable runnable; /**< to be passed to the executor */
+    struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */
 };
 
-struct fetcher_thread {
-    void (*pf_worker)( input_fetcher_t*, struct fetcher_request* );
+static void RunDownloader(void *);
+static void RunSearchLocal(void *);
+static void RunSearchNetwork(void *);
+
+static struct task *
+TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
+        input_item_meta_request_option_t options,
+        const input_fetcher_callbacks_t *cbs, void *userdata)
+{
+    struct task *task = malloc(sizeof(*task));
+    if (!task)
+        return NULL;
+
+    task->fetcher = fetcher;
+    task->executor = executor;
+    task->item = item;
+    task->options = options;
+    task->cbs = cbs;
+    task->userdata = userdata;
 
-    struct background_worker* worker;
-    struct fetcher_request* req;
-    input_fetcher_t* fetcher;
+    vlc_interrupt_init(&task->interrupt);
 
-    vlc_interrupt_t interrupt;
-    vlc_thread_t thread;
-    atomic_bool active;
-};
+    input_item_Hold(item);
+
+    if (executor == fetcher->executor_local)
+        task->runnable.run = RunSearchLocal;
+    else if (executor == fetcher->executor_network)
+        task->runnable.run = RunSearchNetwork;
+    else
+    {
+        assert(executor == fetcher->executor_downloader);
+        task->runnable.run = RunDownloader;
+    }
+
+    task->runnable.userdata = task;
+
+    return task;
+}
+
+static void
+TaskDelete(struct task *task)
+{
+    input_item_Release(task->item);
+    vlc_interrupt_deinit(&task->interrupt);
+    free(task);
+}
+
+static void
+FetcherAddTask(input_fetcher_t *fetcher, struct task *task)
+{
+    vlc_mutex_lock(&fetcher->lock);
+    vlc_list_append(&task->node, &fetcher->submitted_tasks);
+    vlc_mutex_unlock(&fetcher->lock);
+}
+
+static void
+FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task)
+{
+    vlc_mutex_lock(&fetcher->lock);
+    vlc_list_remove(&task->node);
+    vlc_mutex_unlock(&fetcher->lock);
+}
+
+static int
+Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item,
+       input_item_meta_request_option_t options,
+       const input_fetcher_callbacks_t *cbs, void *userdata)
+{
+    struct task *task =
+        TaskNew(fetcher, executor, item, options, cbs, userdata);
+    if (!task)
+        return VLC_ENOMEM;
+
+    FetcherAddTask(fetcher, task);
+    vlc_executor_Submit(task->executor, &task->runnable);
+
+    return VLC_SUCCESS;
+}
 
 static char* CreateCacheKey( input_item_t* item )
 {
@@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope)
     return CheckArt( item );
 }
 
-static int SearchByScope( input_fetcher_t* fetcher,
-    struct fetcher_request* req, int scope )
+static int SearchByScope(struct task *task, int scope)
 {
-    input_item_t* item = req->item;
+    input_fetcher_t *fetcher = task->fetcher;
+    input_item_t* item = task->item;
 
     if( CheckMeta( item ) &&
-        InvokeModule( fetcher, req->item, scope, "meta fetcher" ) )
+        InvokeModule( fetcher, item, scope, "meta fetcher" ) )
     {
         return VLC_EGENERIC;
     }
@@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher,
         ! input_FindArtInCache( item )             ||
         ! SearchArt( fetcher, item, scope ) )
     {
-        AddAlbumCache( fetcher, req->item, false );
-        if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) )
+        AddAlbumCache( fetcher, task->item, false );
+        int ret = Submit(fetcher, fetcher->executor_downloader, item,
+                         task->options, task->cbs, task->userdata);
+        if (ret == VLC_SUCCESS)
             return VLC_SUCCESS;
     }
 
     return VLC_EGENERIC;
 }
 
-static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched )
+static void NotifyArtFetchEnded(struct task *task, bool fetched)
 {
-    if (req->cbs && req->cbs->on_art_fetch_ended)
-        req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata);
+    if (task->cbs && task->cbs->on_art_fetch_ended)
+        task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata);
 }
 
-static void Downloader( input_fetcher_t* fetcher,
-    struct fetcher_request* req )
+static void RunDownloader(void *userdata)
 {
-    ReadAlbumCache( fetcher, req->item );
+    struct task *task = userdata;
+    input_fetcher_t *fetcher = task->fetcher;
+
+    vlc_interrupt_set(&task->interrupt);
+
+    ReadAlbumCache( fetcher, task->item );
 
-    char *psz_arturl = input_item_GetArtURL( req->item );
+    char *psz_arturl = input_item_GetArtURL( task->item );
     if( !psz_arturl )
         goto error;
 
@@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher,
         goto error;
     }
 
-    input_SaveArt( fetcher->owner, req->item, output_stream.ptr,
+    input_SaveArt( fetcher->owner, task->item, output_stream.ptr,
                    output_stream.length, NULL );
 
     free( output_stream.ptr );
-    AddAlbumCache( fetcher, req->item, true );
+    AddAlbumCache( fetcher, task->item, true );
 
 out:
+    vlc_interrupt_set(NULL);
+
     if( psz_arturl )
     {
-        var_SetAddress( fetcher->owner, "item-change", req->item );
-        input_item_SetArtFetched( req->item, true );
+        var_SetAddress( fetcher->owner, "item-change", task->item );
+        input_item_SetArtFetched( task->item, true );
     }
 
     free( psz_arturl );
-    NotifyArtFetchEnded(req, psz_arturl != NULL);
+    NotifyArtFetchEnded(task, psz_arturl != NULL);
+    FetcherRemoveTask(fetcher, task);
+    TaskDelete(task);
     return;
 
 error:
+    vlc_interrupt_set(NULL);
+
     FREENULL( psz_arturl );
+    NotifyArtFetchEnded(task, false);
+    FetcherRemoveTask(fetcher, task);
+    TaskDelete(task);
     goto out;
 }
 
-static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req )
+static void RunSearchLocal(void *userdata)
 {
-    if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
-        return; /* done */
+    struct task *task = userdata;
+    input_fetcher_t *fetcher = task->fetcher;
+
+    vlc_interrupt_set(&task->interrupt);
+
+    if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS )
+        goto end; /* done */
 
     if( var_InheritBool( fetcher->owner, "metadata-network-access" ) ||
-        req->options & META_REQUEST_OPTION_FETCH_NETWORK )
+        task->options & META_REQUEST_OPTION_FETCH_NETWORK )
     {
-        if( background_worker_Push( fetcher->network, req, NULL, 0 ) )
-            NotifyArtFetchEnded(req, false);
+        int ret = Submit(fetcher, fetcher->executor_network, task->item,
+                         task->options, task->cbs, task->userdata);
+        if (ret != VLC_SUCCESS)
+            NotifyArtFetchEnded(task, false);
     }
     else
     {
-        input_item_SetArtNotFound( req->item, true );
-        NotifyArtFetchEnded(req, false);
+        input_item_SetArtNotFound( task->item, true );
+        NotifyArtFetchEnded(task, false);
     }
-}
 
-static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req )
-{
-    if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) )
-    {
-        input_item_SetArtNotFound( req->item, true );
-        NotifyArtFetchEnded(req, false);
-    }
-}
-
-static void RequestRelease( void* req_ )
-{
-    struct fetcher_request* req = req_;
+end:
+    vlc_interrupt_set(NULL);
 
-    if( !vlc_atomic_rc_dec( &req->rc ) )
-        return;
-
-    input_item_Release( req->item );
-    free( req );
-}
-
-static void RequestHold( void* req_ )
-{
-    struct fetcher_request* req = req_;
-    vlc_atomic_rc_inc( &req->rc );
-}
-
-static void* FetcherThread( void* handle )
-{
-    struct fetcher_thread* th = handle;
-    vlc_interrupt_set( &th->interrupt );
-
-    th->pf_worker( th->fetcher, th->req );
-
-    atomic_store( &th->active, false );
-    background_worker_RequestProbe( th->worker );
-    return NULL;
+    FetcherRemoveTask(fetcher, task);
+    TaskDelete(task);
 }
 
-static int StartWorker( input_fetcher_t* fetcher,
-    void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ),
-    struct background_worker* bg, struct fetcher_request* req, void** handle )
+static void RunSearchNetwork(void *userdata)
 {
-    struct fetcher_thread* th = malloc( sizeof *th );
-
-    if( unlikely( !th ) )
-        return VLC_ENOMEM;
-
-    th->req = req;
-    th->worker = bg;
-    th->fetcher = fetcher;
-    th->pf_worker = pf_worker;
+    struct task *task = userdata;
 
-    vlc_interrupt_init( &th->interrupt );
-    atomic_init( &th->active, true );
+    vlc_interrupt_set(&task->interrupt);
 
-    if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) )
+    if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS )
     {
-        *handle = th;
-        return VLC_SUCCESS;
+        input_item_SetArtNotFound( task->item, true );
+        NotifyArtFetchEnded(task, false);
     }
 
-    vlc_interrupt_deinit( &th->interrupt );
-    free( th );
-    return VLC_EGENERIC;
-}
+    vlc_interrupt_set(NULL);
 
-static int ProbeWorker( void* fetcher_, void* th_ )
-{
-    return !atomic_load( &((struct fetcher_thread*)th_)->active );
-    VLC_UNUSED( fetcher_ );
-}
-
-static void CloseWorker( void* fetcher_, void* th_ )
-{
-    struct fetcher_thread* th = th_;
-    VLC_UNUSED( fetcher_ );
-
-    vlc_interrupt_kill( &th->interrupt );
-    vlc_join( th->thread, NULL );
-    vlc_interrupt_deinit( &th->interrupt );
-    free( th );
-}
-
-#define DEF_STARTER(name, worker) \
-static int Start ## name( void* fetcher_, void* req_, void** out ) { \
-    input_fetcher_t* fetcher = fetcher_; \
-    return StartWorker( fetcher, name, worker, req_, out ); }
-
-DEF_STARTER(  SearchLocal, fetcher->local )
-DEF_STARTER(SearchNetwork, fetcher->network )
-DEF_STARTER(   Downloader, fetcher->downloader )
-
-static void WorkerInit( input_fetcher_t* fetcher,
-    struct background_worker** worker, int( *starter )( void*, void*, void** ) )
-{
-    struct background_worker_config conf = {
-        .default_timeout = 0,
-        .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
-        .pf_start = starter,
-        .pf_probe = ProbeWorker,
-        .pf_stop = CloseWorker,
-        .pf_release = RequestRelease,
-        .pf_hold = RequestHold };
-
-    *worker = background_worker_New( fetcher, &conf );
+    input_fetcher_t *fetcher = task->fetcher;
+    FetcherRemoveTask(fetcher, task);
+    TaskDelete(task);
 }
 
 input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
@@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner )
     if( unlikely( !fetcher ) )
         return NULL;
 
-    fetcher->owner = owner;
-
-    WorkerInit( fetcher, &fetcher->local, StartSearchLocal );
-    WorkerInit( fetcher, &fetcher->network, StartSearchNetwork );
-    WorkerInit( fetcher, &fetcher->downloader, StartDownloader );
+    int max_threads = var_InheritInteger(owner, "fetch-art-threads");
+    if (max_threads < 1)
+        max_threads = 1;
 
-    if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) )
+    fetcher->executor_local = vlc_executor_New(max_threads);
+    if (!fetcher->executor_local)
     {
-        if( fetcher->local )
-            background_worker_Delete( fetcher->local );
-
-        if( fetcher->network )
-            background_worker_Delete( fetcher->network );
+        free(fetcher);
+        return NULL;
+    }
 
-        if( fetcher->downloader )
-            background_worker_Delete( fetcher->downloader );
+    fetcher->executor_network = vlc_executor_New(max_threads);
+    if (!fetcher->executor_network)
+    {
+        vlc_executor_Delete(fetcher->executor_local);
+        free(fetcher);
+        return NULL;
+    }
 
-        free( fetcher );
+    fetcher->executor_downloader = vlc_executor_New(max_threads);
+    if (!fetcher->executor_downloader)
+    {
+        vlc_executor_Delete(fetcher->executor_network);
+        vlc_executor_Delete(fetcher->executor_local);
+        free(fetcher);
         return NULL;
     }
 
-    vlc_mutex_init( &fetcher->lock );
+    fetcher->owner = owner;
+
+    vlc_mutex_init(&fetcher->lock);
+    vlc_list_init(&fetcher->submitted_tasks);
+
     vlc_dictionary_init( &fetcher->album_cache, 0 );
 
     return fetcher;
 }
 
-int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item,
+int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item,
     input_item_meta_request_option_t options,
-    const input_fetcher_callbacks_t *cbs, void *cbs_userdata )
+    const input_fetcher_callbacks_t *cbs, void *cbs_userdata)
 {
     assert(options & META_REQUEST_OPTION_FETCH_ANY);
-    struct fetcher_request* req = malloc( sizeof *req );
 
-    if( unlikely( !req ) )
-        return VLC_ENOMEM;
+    vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL
+                             ? fetcher->executor_local
+                             : fetcher->executor_network;
 
-    req->item = item;
-    req->options = options;
-    req->cbs = cbs;
-    req->userdata = cbs_userdata;
+    return Submit(fetcher, executor, item, options, cbs, cbs_userdata);
+}
 
-    vlc_atomic_rc_init( &req->rc );
-    input_item_Hold( item );
+static void
+CancelAllTasks(input_fetcher_t *fetcher)
+{
+    vlc_mutex_lock(&fetcher->lock);
 
-    struct background_worker* worker =
-        options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network;
-    if( background_worker_Push( worker, req, NULL, 0 ) )
-        NotifyArtFetchEnded(req, false);
+    struct task *task;
+    vlc_list_foreach(task, &fetcher->submitted_tasks, node)
+    {
+        bool canceled = vlc_executor_Cancel(task->executor, &task->runnable);
+        if (canceled)
+        {
+            NotifyArtFetchEnded(task, false);
+            vlc_list_remove(&task->node);
+            TaskDelete(task);
+        }
+        /* Otherwise, the task will be finished after run() */
+    }
 
-    RequestRelease( req );
-    return VLC_SUCCESS;
+    vlc_mutex_unlock(&fetcher->lock);
 }
 
 void input_fetcher_Delete( input_fetcher_t* fetcher )
 {
-    background_worker_Delete( fetcher->local );
-    background_worker_Delete( fetcher->network );
-    background_worker_Delete( fetcher->downloader );
+    CancelAllTasks(fetcher);
+
+    vlc_executor_Delete(fetcher->executor_local);
+    vlc_executor_Delete(fetcher->executor_network);
+    vlc_executor_Delete(fetcher->executor_downloader);
 
     vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL );
     free( fetcher );
-- 
2.28.0



More information about the vlc-devel mailing list