[vlc-commits] preparser: use vlc_executor_t
Romain Vimont
git at videolan.org
Fri Oct 23 17:48:05 CEST 2020
vlc | branch: master | Romain Vimont <rom1v at videolabs.io> | Tue Sep 1 18:09:09 2020 +0200| [5386f8eea902906c2e359f25976847a9745f4648] | committer: Alexandre Janniaux
preparser: use vlc_executor_t
Replace the background worker by an executor.
Signed-off-by: Alexandre Janniaux <ajanni at videolabs.io>
> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=5386f8eea902906c2e359f25976847a9745f4648
---
src/preparser/preparser.c | 401 +++++++++++++++++++++++++++++-----------------
1 file changed, 251 insertions(+), 150 deletions(-)
diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
index 723feb96dc..e5bd0b87cb 100644
--- a/src/preparser/preparser.c
+++ b/src/preparser/preparser.c
@@ -24,8 +24,8 @@
#include <vlc_common.h>
#include <vlc_atomic.h>
+#include <vlc_executor.h>
-#include "misc/background_worker.h"
#include "input/input_interface.h"
#include "input/input_internal.h"
#include "preparser.h"
@@ -35,222 +35,299 @@ struct input_preparser_t
{
vlc_object_t* owner;
input_fetcher_t* fetcher;
- struct background_worker* worker;
+ vlc_executor_t *executor;
+ vlc_tick_t default_timeout;
atomic_bool deactivated;
+
+ vlc_mutex_t lock;
+ struct vlc_list submitted_tasks; /**< list of struct task */
};
-typedef struct input_preparser_req_t
+struct task
{
+ input_preparser_t *preparser;
input_item_t *item;
input_item_meta_request_option_t options;
const input_preparser_callbacks_t *cbs;
void *userdata;
- vlc_atomic_rc_t rc;
-} input_preparser_req_t;
+ void *id;
+ vlc_tick_t timeout;
-typedef struct input_preparser_task_t
-{
- input_preparser_req_t *req;
- input_preparser_t* preparser;
- int preparse_status;
input_item_parser_id_t *parser;
- atomic_int state;
- atomic_bool done;
-} input_preparser_task_t;
-
-static input_preparser_req_t *ReqCreate(input_item_t *item,
- input_item_meta_request_option_t options,
- const input_preparser_callbacks_t *cbs,
- void *userdata)
+
+ vlc_mutex_t lock;
+ vlc_cond_t cond_ended;
+ bool preparse_ended;
+ int preparse_status;
+ bool fetch_ended;
+
+ atomic_bool interrupted;
+
+ struct vlc_runnable runnable; /**< to be passed to the executor */
+
+ struct vlc_list node; /**< node of input_preparser_t.submitted_tasks */
+};
+
+static void RunnableRun(void *);
+
+static struct task *
+TaskNew(input_preparser_t *preparser, input_item_t *item,
+ input_item_meta_request_option_t options,
+ const input_preparser_callbacks_t *cbs, void *userdata,
+ void *id, vlc_tick_t timeout)
{
- input_preparser_req_t *req = malloc(sizeof(*req));
- if (unlikely(!req))
+ assert(timeout >= 0);
+
+ struct task *task = malloc(sizeof(*task));
+ if (!task)
return NULL;
- req->item = item;
- req->options = options;
- req->cbs = cbs;
- req->userdata = userdata;
- vlc_atomic_rc_init(&req->rc);
+ task->preparser = preparser;
+ task->item = item;
+ task->options = options;
+ task->cbs = cbs;
+ task->userdata = userdata;
+ task->id = id;
+ task->timeout = timeout;
input_item_Hold(item);
- return req;
+ task->parser = NULL;
+ vlc_mutex_init(&task->lock);
+ vlc_cond_init(&task->cond_ended);
+ task->preparse_ended = false;
+ task->preparse_status = ITEM_PREPARSE_SKIPPED;
+ task->fetch_ended = false;
+
+ atomic_init(&task->interrupted, false);
+
+ task->runnable.run = RunnableRun;
+ task->runnable.userdata = task;
+
+ return task;
}
-static void ReqHold(input_preparser_req_t *req)
+static void
+TaskDelete(struct task *task)
{
- vlc_atomic_rc_inc(&req->rc);
+ input_item_Release(task->item);
+ free(task);
}
-static void ReqRelease(input_preparser_req_t *req)
+static void
+PreparserAddTask(input_preparser_t *preparser, struct task *task)
{
- if (vlc_atomic_rc_dec(&req->rc))
- {
- input_item_Release(req->item);
- free(req);
- }
+ vlc_mutex_lock(&preparser->lock);
+ vlc_list_append(&task->node, &preparser->submitted_tasks);
+ vlc_mutex_unlock(&preparser->lock);
}
-static void OnParserEnded(input_item_t *item, int status, void *task_)
+static void
+PreparserRemoveTask(input_preparser_t *preparser, struct task *task)
{
- VLC_UNUSED(item);
- input_preparser_task_t* task = task_;
-
- atomic_store( &task->state, status );
- atomic_store( &task->done, true );
- background_worker_RequestProbe( task->preparser->worker );
+ vlc_mutex_lock(&preparser->lock);
+ vlc_list_remove(&task->node);
+ vlc_mutex_unlock(&preparser->lock);
}
-static void OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
- void *task_)
+static void
+NotifyPreparseEnded(struct task *task)
{
- VLC_UNUSED(item);
- input_preparser_task_t* task = task_;
- input_preparser_req_t *req = task->req;
-
- if (req->cbs && req->cbs->on_subtree_added)
- req->cbs->on_subtree_added(req->item, subtree, req->userdata);
+ if (task->cbs && task->cbs->on_preparse_ended)
+ task->cbs->on_preparse_ended(task->item, task->preparse_status,
+ task->userdata);
}
-static int PreparserOpenInput( void* preparser_, void* req_, void** out )
+static void
+OnParserEnded(input_item_t *item, int status, void *task_)
{
- input_preparser_t* preparser = preparser_;
- input_preparser_req_t *req = req_;
- input_preparser_task_t* task = malloc( sizeof *task );
-
- if( unlikely( !task ) )
- goto error;
-
- static const input_item_parser_cbs_t cbs = {
- .on_ended = OnParserEnded,
- .on_subtree_added = OnParserSubtreeAdded,
- };
-
- atomic_init( &task->state, VLC_ETIMEOUT );
- atomic_init( &task->done, false );
-
- task->preparser = preparser_;
- task->req = req;
- task->preparse_status = -1;
- task->parser = input_item_Parse( req->item, preparser->owner, &cbs,
- task );
- if( !task->parser )
- goto error;
+ VLC_UNUSED(item);
+ struct task *task = task_;
- *out = task;
+ if (atomic_load(&task->interrupted))
+ /*
+ * On interruption, the call to input_item_parser_id_Release() may
+ * trigger this "parser ended" callback. Ignore it.
+ */
+ return;
- return VLC_SUCCESS;
+ vlc_mutex_lock(&task->lock);
+ assert(!task->preparse_ended);
+ task->preparse_status = status == VLC_SUCCESS ? ITEM_PREPARSE_DONE
+ : ITEM_PREPARSE_FAILED;
+ task->preparse_ended = true;
+ vlc_mutex_unlock(&task->lock);
-error:
- free( task );
- if (req->cbs && req->cbs->on_preparse_ended)
- req->cbs->on_preparse_ended(req->item, ITEM_PREPARSE_FAILED, req->userdata);
- return VLC_EGENERIC;
+ vlc_cond_signal(&task->cond_ended);
}
-static int PreparserProbeInput( void* preparser_, void* task_ )
+static void
+OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
+ void *task_)
{
- input_preparser_task_t* task = task_;
- return atomic_load( &task->done );
- VLC_UNUSED( preparser_ );
+ VLC_UNUSED(item);
+ struct task *task = task_;
+
+ if (task->cbs && task->cbs->on_subtree_added)
+ task->cbs->on_subtree_added(task->item, subtree, task->userdata);
}
-static void on_art_fetch_ended(input_item_t *item, bool fetched, void *userdata)
+static void
+OnArtFetchEnded(input_item_t *item, bool fetched, void *userdata)
{
VLC_UNUSED(item);
VLC_UNUSED(fetched);
- input_preparser_task_t *task = userdata;
- input_preparser_req_t *req = task->req;
- input_item_SetPreparsed(req->item, true);
+ struct task *task = userdata;
- if (req->cbs && req->cbs->on_preparse_ended)
- req->cbs->on_preparse_ended(req->item, task->preparse_status, req->userdata);
+ vlc_mutex_lock(&task->lock);
+ assert(!task->fetch_ended);
+ task->fetch_ended = true;
+ vlc_mutex_unlock(&task->lock);
- ReqRelease(req);
- free(task);
+ vlc_cond_signal(&task->cond_ended);
}
static const input_fetcher_callbacks_t input_fetcher_callbacks = {
- .on_art_fetch_ended = on_art_fetch_ended,
+ .on_art_fetch_ended = OnArtFetchEnded,
};
-static void PreparserCloseInput( void* preparser_, void* task_ )
+static void
+Parse(struct task *task, vlc_tick_t deadline)
{
- input_preparser_task_t* task = task_;
- input_preparser_req_t *req = task->req;
-
- input_preparser_t* preparser = preparser_;
- input_item_t* item = req->item;
+ static const input_item_parser_cbs_t cbs = {
+ .on_ended = OnParserEnded,
+ .on_subtree_added = OnParserSubtreeAdded,
+ };
- int status;
- switch( atomic_load( &task->state ) )
+ vlc_object_t *obj = task->preparser->owner;
+ task->parser = input_item_Parse(task->item, obj, &cbs, task);
+ if (!task->parser)
{
- case VLC_SUCCESS:
- status = ITEM_PREPARSE_DONE;
- break;
- case VLC_ETIMEOUT:
- status = ITEM_PREPARSE_TIMEOUT;
- break;
- default:
- status = ITEM_PREPARSE_FAILED;
- break;
+ task->preparse_status = ITEM_PREPARSE_FAILED;
+ return;
}
- input_item_parser_id_Release( task->parser );
-
- if( preparser->fetcher && (req->options & META_REQUEST_OPTION_FETCH_ANY) )
+ /* Wait until the end of parsing */
+ vlc_mutex_lock(&task->lock);
+ if (deadline == VLC_TICK_INVALID)
+ {
+ while (!task->preparse_ended)
+ vlc_cond_wait(&task->cond_ended, &task->lock);
+ }
+ else
{
- task->preparse_status = status;
- ReqHold(task->req);
- if (!input_fetcher_Push(preparser->fetcher, item,
- req->options & META_REQUEST_OPTION_FETCH_ANY,
- &input_fetcher_callbacks, task))
+ bool timeout = false;
+ while (!task->preparse_ended && !timeout)
+ timeout =
+ vlc_cond_timedwait(&task->cond_ended, &task->lock, deadline);
+
+ if (timeout)
{
- return;
+ task->preparse_status = ITEM_PREPARSE_TIMEOUT;
+ atomic_store(&task->interrupted, true);
}
- ReqRelease(task->req);
}
+ vlc_mutex_unlock(&task->lock);
- free(task);
+ /* This call also interrupts the parsing if it is still running */
+ input_item_parser_id_Release(task->parser);
+}
+
+static void
+Fetch(struct task *task)
+{
+ input_fetcher_t *fetcher = task->preparser->fetcher;
+ if (!fetcher || !(task->options & META_REQUEST_OPTION_FETCH_ANY))
+ return;
+
+ int ret =
+ input_fetcher_Push(fetcher, task->item,
+ task->options & META_REQUEST_OPTION_FETCH_ANY,
+ &input_fetcher_callbacks, task);
+ if (ret != VLC_SUCCESS)
+ return;
+
+ /* Wait until the end of fetching (fetching is not interruptible) */
+ vlc_mutex_lock(&task->lock);
+ while (!task->fetch_ended)
+ vlc_cond_wait(&task->cond_ended, &task->lock);
+ vlc_mutex_unlock(&task->lock);
+}
+
+static void
+RunnableRun(void *userdata)
+{
+ struct task *task = userdata;
+
+ vlc_tick_t deadline = task->timeout ? vlc_tick_now() + task->timeout
+ : VLC_TICK_INVALID;
+
+ if (atomic_load(&task->interrupted))
+ goto end;
- input_item_SetPreparsed( item, true );
- if (req->cbs && req->cbs->on_preparse_ended)
- req->cbs->on_preparse_ended(req->item, status, req->userdata);
+ Parse(task, deadline);
+
+ if (atomic_load(&task->interrupted))
+ goto end;
+
+ Fetch(task);
+
+ if (atomic_load(&task->interrupted))
+ goto end;
+
+ input_item_SetPreparsed(task->item, true);
+
+end:
+ NotifyPreparseEnded(task);
+ input_preparser_t *preparser = task->preparser;
+ PreparserRemoveTask(preparser, task);
+ TaskDelete(task);
}
-static void ReqHoldVoid(void *item) { ReqHold(item); }
-static void ReqReleaseVoid(void *item) { ReqRelease(item); }
+static void
+Interrupt(struct task *task)
+{
+ assert(!atomic_load(&task->interrupted));
+ atomic_store(&task->interrupted, true);
+
+ /* Wake up the preparser cond_wait */
+ vlc_mutex_lock(&task->lock);
+ task->preparse_status = ITEM_PREPARSE_TIMEOUT;
+ task->preparse_ended = true;
+ vlc_mutex_unlock(&task->lock);
+ vlc_cond_signal(&task->cond_ended);
+}
input_preparser_t* input_preparser_New( vlc_object_t *parent )
{
input_preparser_t* preparser = malloc( sizeof *preparser );
+ if (!preparser)
+ return NULL;
- struct background_worker_config conf = {
- .default_timeout = VLC_TICK_FROM_MS(var_InheritInteger( parent, "preparse-timeout" )),
- .max_threads = var_InheritInteger( parent, "preparse-threads" ),
- .pf_start = PreparserOpenInput,
- .pf_probe = PreparserProbeInput,
- .pf_stop = PreparserCloseInput,
- .pf_release = ReqReleaseVoid,
- .pf_hold = ReqHoldVoid
- };
-
-
- if( likely( preparser ) )
- preparser->worker = background_worker_New( preparser, &conf );
+ int max_threads = var_InheritInteger(parent, "preparse-threads");
+ if (max_threads < 1)
+ max_threads = 1;
- if( unlikely( !preparser || !preparser->worker ) )
+ preparser->executor = vlc_executor_New(max_threads);
+ if (!preparser->executor)
{
- free( preparser );
+ free(preparser);
return NULL;
}
+ preparser->default_timeout =
+ VLC_TICK_FROM_MS(var_InheritInteger(parent, "preparse-timeout"));
+ if (preparser->default_timeout < 0)
+ preparser->default_timeout = 0;
+
preparser->owner = parent;
preparser->fetcher = input_fetcher_New( parent );
atomic_init( &preparser->deactivated, false );
+ vlc_mutex_init(&preparser->lock);
+ vlc_list_init(&preparser->submitted_tasks);
+
if( unlikely( !preparser->fetcher ) )
msg_Warn( parent, "unable to create art fetcher" );
@@ -260,7 +337,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
void input_preparser_Push( input_preparser_t *preparser,
input_item_t *item, input_item_meta_request_option_t i_options,
const input_preparser_callbacks_t *cbs, void *cbs_userdata,
- int timeout, void *id )
+ int timeout_ms, void *id )
{
if( atomic_load( &preparser->deactivated ) )
return;
@@ -287,14 +364,14 @@ void input_preparser_Push( input_preparser_t *preparser,
return;
}
- struct input_preparser_req_t *req = ReqCreate(item, i_options,
- cbs, cbs_userdata);
+ vlc_tick_t timeout = timeout_ms == -1 ? preparser->default_timeout
+ : VLC_TICK_FROM_MS(timeout_ms);
+ struct task *task =
+ TaskNew(preparser, item, i_options, cbs, cbs_userdata, id, timeout);
- if (background_worker_Push(preparser->worker, req, id, timeout))
- if (req->cbs && cbs->on_preparse_ended)
- cbs->on_preparse_ended(item, ITEM_PREPARSE_FAILED, cbs_userdata);
+ PreparserAddTask(preparser, task);
- ReqRelease(req);
+ vlc_executor_Submit(preparser->executor, &task->runnable);
}
void input_preparser_fetcher_Push( input_preparser_t *preparser,
@@ -308,18 +385,42 @@ void input_preparser_fetcher_Push( input_preparser_t *preparser,
void input_preparser_Cancel( input_preparser_t *preparser, void *id )
{
- background_worker_Cancel( preparser->worker, id );
+ vlc_mutex_lock(&preparser->lock);
+
+ struct task *task;
+ vlc_list_foreach(task, &preparser->submitted_tasks, node)
+ {
+ if (!id || task->id == id)
+ {
+ bool canceled =
+ vlc_executor_Cancel(preparser->executor, &task->runnable);
+ if (canceled)
+ {
+ NotifyPreparseEnded(task);
+ vlc_list_remove(&task->node);
+ TaskDelete(task);
+ }
+ else
+ /* The task will be finished and destroyed after run() */
+ Interrupt(task);
+ }
+ }
+
+ vlc_mutex_unlock(&preparser->lock);
}
void input_preparser_Deactivate( input_preparser_t* preparser )
{
atomic_store( &preparser->deactivated, true );
- background_worker_Cancel( preparser->worker, NULL );
+ input_preparser_Cancel(preparser, NULL);
}
void input_preparser_Delete( input_preparser_t *preparser )
{
- background_worker_Delete( preparser->worker );
+ /* In case input_preparser_Deactivate() has not been called */
+ input_preparser_Cancel(preparser, NULL);
+
+ vlc_executor_Delete(preparser->executor);
if( preparser->fetcher )
input_fetcher_Delete( preparser->fetcher );
More information about the vlc-commits
mailing list