[vlc-devel] commit: Sanitize/rewrite the message subscription API ( Rémi Denis-Courmont )

git version control git at videolan.org
Sun Oct 12 21:45:27 CEST 2008


vlc | branch: master | Rémi Denis-Courmont <rdenis at simphalempin.com> | Sun Oct 12 21:00:46 2008 +0300| [9745c4a826770b11f5dcf74fc7b4593203e6958b] | committer: Rémi Denis-Courmont 

Sanitize/rewrite the message subscription API

> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=9745c4a826770b11f5dcf74fc7b4593203e6958b
---

 include/vlc_messages.h |   22 +++----
 src/libvlc.h           |    5 +-
 src/libvlccore.sym     |    4 +-
 src/misc/messages.c    |  148 ++++++++++++++++++++++++++++++++++++------------
 4 files changed, 126 insertions(+), 53 deletions(-)

diff --git a/include/vlc_messages.h b/include/vlc_messages.h
index b755b89..4d0d507 100644
--- a/include/vlc_messages.h
+++ b/include/vlc_messages.h
@@ -71,14 +71,6 @@ typedef struct
  * Used by interface plugins which subscribe to the message bank.
  */
 typedef struct msg_subscription_t msg_subscription_t;
-struct msg_subscription_t
-{
-    int   i_start;
-    int*  pi_stop;
-
-    msg_item_t*  p_msg;
-    vlc_mutex_t* p_lock;
-};
 
 /*****************************************************************************
  * Prototypes
@@ -100,10 +92,16 @@ VLC_EXPORT( void, __msg_GenericVa, ( vlc_object_t *, int, const char *, const ch
       __msg_Generic( VLC_OBJECT(p_this), VLC_MSG_DBG, \
                      MODULE_STRING, __VA_ARGS__ )
 
-#define msg_Subscribe(a) __msg_Subscribe(VLC_OBJECT(a))
-#define msg_Unsubscribe(a,b) __msg_Unsubscribe(VLC_OBJECT(a),b)
-VLC_EXPORT( msg_subscription_t*, __msg_Subscribe, ( vlc_object_t * ) );
-VLC_EXPORT( void, __msg_Unsubscribe, ( vlc_object_t *, msg_subscription_t * ) );
+typedef struct msg_cb_data_t msg_cb_data_t;
+
+/**
+ * Message logging callback signature.
+ * Accepts one private data pointer, the message, and an overrun counter.
+ */
+typedef void (*msg_callback_t) (msg_cb_data_t *, msg_item_t *, unsigned);
+
+VLC_EXPORT( msg_subscription_t*, msg_Subscribe, ( libvlc_int_t *, msg_callback_t, msg_cb_data_t * ) );
+VLC_EXPORT( void, msg_Unsubscribe, ( msg_subscription_t * ) );
 
 /* Enable or disable a certain object debug messages */
 #define msg_EnableObjectPrinting(a,b) __msg_EnableObjectPrinting(VLC_OBJECT(a),b)
diff --git a/src/libvlc.h b/src/libvlc.h
index 94c202c..ddeda34 100644
--- a/src/libvlc.h
+++ b/src/libvlc.h
@@ -76,13 +76,14 @@ uint32_t CPUCapabilities( void );
 typedef struct msg_bank_t
 {
     /** Message queue lock */
-    vlc_mutex_t             lock;
-    bool              b_overflow;
+    vlc_mutex_t lock;
+    vlc_cond_t  wait;
 
     /* Message queue */
     msg_item_t              msg[VLC_MSG_QSIZE];           /**< message queue */
     int i_start;
     int i_stop;
+    bool b_overflow;
 
     /* Subscribers */
     int i_sub;
diff --git a/src/libvlccore.sym b/src/libvlccore.sym
index 2da80b6..99e5328 100644
--- a/src/libvlccore.sym
+++ b/src/libvlccore.sym
@@ -223,8 +223,8 @@ __msg_DisableObjectPrinting
 __msg_EnableObjectPrinting
 __msg_Generic
 __msg_GenericVa
-__msg_Subscribe
-__msg_Unsubscribe
+msg_Subscribe
+msg_Unsubscribe
 msleep
 mstrtime
 mwait
diff --git a/src/misc/messages.c b/src/misc/messages.c
index ba93614..81c8aba 100644
--- a/src/misc/messages.c
+++ b/src/misc/messages.c
@@ -76,6 +76,10 @@ static uintptr_t banks = 0;
 #endif
 
 #define QUEUE priv->msg_bank
+static inline msg_bank_t *libvlc_bank (libvlc_int_t *inst)
+{
+    return &(libvlc_priv (inst))->msg_bank;
+}
 
 /*****************************************************************************
  * Local prototypes
@@ -94,7 +98,10 @@ static vlc_mutex_t msg_stack_lock = VLC_STATIC_MUTEX;
 void msg_Create (libvlc_int_t *p_libvlc)
 {
     libvlc_priv_t *priv = libvlc_priv (p_libvlc);
-    vlc_mutex_init( &QUEUE.lock );
+    msg_bank_t *bank = libvlc_bank (p_libvlc);
+
+    vlc_mutex_init (&bank->lock);
+    vlc_cond_init (&bank->wait);
     vlc_dictionary_init( &priv->msg_enabled_objects, 0 );
     priv->msg_all_objects_enabled = true;
 
@@ -168,9 +175,10 @@ void __msg_DisableObjectPrinting (vlc_object_t *p_this, char * psz_object)
 void msg_Destroy (libvlc_int_t *p_libvlc)
 {
     libvlc_priv_t *priv = libvlc_priv (p_libvlc);
+    msg_bank_t *bank = libvlc_bank (p_libvlc);
 
     if( QUEUE.i_sub )
-        msg_Err( p_libvlc, "stale interface subscribers" );
+        msg_Err( p_libvlc, "stale interface subscribers (VLC might crash)" );
 
     FlushMsg( &QUEUE );
 
@@ -185,52 +193,101 @@ void msg_Destroy (libvlc_int_t *p_libvlc)
 
     vlc_dictionary_clear( &priv->msg_enabled_objects, NULL, NULL );
 
-    /* Destroy lock */
-    vlc_mutex_destroy( &QUEUE.lock );
+    vlc_cond_destroy (&bank->wait);
+    vlc_mutex_destroy (&bank->lock);
+}
+
+struct msg_subscription_t
+{
+    vlc_thread_t    thread;
+    libvlc_int_t   *instance;
+    msg_callback_t  func;
+    msg_cb_data_t  *opaque;
+    msg_item_t     *items[VLC_MSG_QSIZE];
+    unsigned        begin, end;
+    unsigned        overruns;
+};
+
+static void *msg_thread (void *data)
+{
+    msg_subscription_t *sub = data;
+    msg_bank_t *bank = libvlc_bank (sub->instance);
+
+    /* TODO: finer-grained locking and/or msg_item_t refcount */
+    vlc_mutex_lock (&bank->lock);
+    mutex_cleanup_push (&bank->lock);
+
+    for (;;)
+    {
+        /* Wait for messages */
+        assert (sub->begin < VLC_MSG_QSIZE);
+        assert (sub->end < VLC_MSG_QSIZE);
+        while (sub->begin != sub->end)
+        {
+            sub->func (sub->opaque, sub->items[sub->begin], sub->overruns);
+            if (++sub->begin == VLC_MSG_QSIZE)
+                sub->begin = 0;
+            sub->overruns = 0;
+        }
+        vlc_cond_wait (&bank->wait, &bank->lock);
+    }
+
+    vlc_cleanup_pop ();
+    assert (0);
 }
 
 /**
- * Subscribe to a message queue.
+ * Subscribe to the message queue.
+ * Whenever a message is emitted, a callback will be called.
+ * Callback invocation are serialized within a subscription.
+ *
+ * @param instance LibVLC instance to get messages from
+ * @param cb callback function
+ * @param opaque data for the callback function
+ * @return a subscription pointer, or NULL in case of failure
  */
-msg_subscription_t *__msg_Subscribe( vlc_object_t *p_this )
+msg_subscription_t *msg_Subscribe (libvlc_int_t *instance, msg_callback_t cb,
+                                   msg_cb_data_t *opaque)
 {
-    libvlc_priv_t *priv = libvlc_priv (p_this->p_libvlc);
-    msg_subscription_t *p_sub = malloc( sizeof( msg_subscription_t ) );
-
-    if (p_sub == NULL)
+    msg_subscription_t *sub = malloc (sizeof (*sub));
+    if (sub == NULL)
         return NULL;
 
-    vlc_mutex_lock( &QUEUE.lock );
-
-    TAB_APPEND( QUEUE.i_sub, QUEUE.pp_sub, p_sub );
+    sub->instance = instance;
+    sub->func = cb;
+    sub->opaque = opaque;
+    sub->begin = sub->end = sub->overruns = 0;
 
-    p_sub->i_start = QUEUE.i_start;
-    p_sub->pi_stop = &QUEUE.i_stop;
-    p_sub->p_msg   = QUEUE.msg;
-    p_sub->p_lock  = &QUEUE.lock;
+    if (vlc_clone (&sub->thread, msg_thread, sub, VLC_THREAD_PRIORITY_LOW))
+    {
+        free (sub);
+        return NULL;
+    }
 
-    vlc_mutex_unlock( &QUEUE.lock );
+    msg_bank_t *bank = libvlc_bank (instance);
+    vlc_mutex_lock (&bank->lock);
+    TAB_APPEND (bank->i_sub, bank->pp_sub, sub);
+    vlc_mutex_unlock (&bank->lock);
 
-    return p_sub;
+    return sub;
 }
 
 /**
- * Unsubscribe from a message queue.
+ * Unsubscribe from the message queue.
+ * This function waits for the message callback to return if needed.
  */
-void __msg_Unsubscribe( vlc_object_t *p_this, msg_subscription_t *p_sub )
+void msg_Unsubscribe (msg_subscription_t *sub)
 {
-    libvlc_priv_t *priv = libvlc_priv (p_this->p_libvlc);
+    msg_bank_t *bank = libvlc_bank (sub->instance);
 
-    vlc_mutex_lock( &QUEUE.lock );
-    for( int j = 0 ; j< QUEUE.i_sub ; j++ )
-    {
-        if( QUEUE.pp_sub[j] == p_sub )
-        {
-            REMOVE_ELEM( QUEUE.pp_sub, QUEUE.i_sub, j );
-            free( p_sub );
-        }
-    }
-    vlc_mutex_unlock( &QUEUE.lock );
+    /* TODO: flush support? */
+    vlc_cancel (sub->thread);
+    vlc_mutex_lock (&bank->lock);
+    TAB_REMOVE (bank->i_sub, bank->pp_sub, sub);
+    vlc_mutex_unlock (&bank->lock);
+
+    vlc_join (sub->thread, NULL);
+    free (sub);
 }
 
 /*****************************************************************************
@@ -469,15 +526,32 @@ static void QueueMsg( vlc_object_t *p_this, int i_type, const char *psz_module,
     p_item->psz_header =    psz_header;
 
     PrintMsg( p_this, p_item );
-
-    if( p_queue->b_overflow )
+#define bank p_queue
+    if( p_item == &item )
     {
         free( p_item->psz_module );
         free( p_item->psz_msg );
         free( p_item->psz_header );
+        for (int i = 0; i < bank->i_sub; i++)
+            bank->pp_sub[i]->overruns++;
     }
-
-    vlc_mutex_unlock ( &p_queue->lock );
+    else
+    {
+        for (int i = 0; i < bank->i_sub; i++)
+        {
+            msg_subscription_t *sub = bank->pp_sub[i];
+            if ((sub->end + 1 - sub->begin) % VLC_MSG_QSIZE)
+            {
+                sub->items[sub->end++] = p_item;
+                if (sub->end == VLC_MSG_QSIZE)
+                    sub->end = 0;
+            }
+            else
+                sub->overruns++;
+        }
+        vlc_cond_broadcast (&bank->wait);
+    }
+    vlc_mutex_unlock (&bank->lock);
 }
 
 /* following functions are local */
@@ -498,7 +572,7 @@ static void FlushMsg ( msg_bank_t *p_queue )
     /* Check until which value we can free messages */
     for( i_index = 0; i_index < p_queue->i_sub; i_index++ )
     {
-        i_start = p_queue->pp_sub[ i_index ]->i_start;
+        i_start = p_queue->pp_sub[ i_index ]->begin;
 
         /* If this subscriber is late, we don't free messages before
          * his i_start value, otherwise he'll miss messages */




More information about the vlc-devel mailing list