[vlc-devel] [PATCH 2/2] access_out: srt: cancellable by block_cleanup_push in thread

Justin Kim justin.kim at collabora.com
Thu Jan 11 09:46:42 CET 2018


To support Win32, this patch removes platform dependent
APIs. Although SRT provides poll-like mechanism, it doesn't
support Win32 too. Therefore, in this patch, srt module
sends packets in blocking-mode in a thread which is cancellable
by clean-up handler of VLC.

Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
 modules/access_output/srt.c | 147 ++++++++++++++++----------------------------
 1 file changed, 52 insertions(+), 95 deletions(-)

diff --git a/modules/access_output/srt.c b/modules/access_output/srt.c
index 904d8f56d0..9f136e374c 100644
--- a/modules/access_output/srt.c
+++ b/modules/access_output/srt.c
@@ -42,8 +42,6 @@
 #define SRT_DEFAULT_CHUNK_SIZE 1316
 /* libsrt tutorial uses 9000 as a default binding port */
 #define SRT_DEFAULT_PORT 9000
-/* The default timeout is -1 (infinite) */
-#define SRT_DEFAULT_POLL_TIMEOUT 100
 /* The default latency is 125
  * which uses srt library internally */
 #define SRT_DEFAULT_LATENCY 125
@@ -61,22 +59,36 @@ static const char *const srt_key_length_names[] = {
 struct sout_access_out_sys_t
 {
     SRTSOCKET     sock;
-    int           i_poll_id;
-    int           i_poll_timeout;
-    int           i_latency;
-    size_t        i_chunk_size;
-    int           i_pipe_fds[2];
+    vlc_thread_t  thread;
+    block_fifo_t *fifo;
 };
 
-static void srt_wait_interrupted(void *p_data)
+static void *Thread(void *p_data)
 {
     sout_access_out_t *p_access = p_data;
     sout_access_out_sys_t *p_sys = p_access->p_sys;
-    msg_Dbg( p_access, "Waking up srt_epoll_wait");
-    if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+    size_t i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
+    for(;;)
     {
-        msg_Err( p_access, "Failed to send data to event fd");
+        block_t *pkt = block_FifoGet( p_sys->fifo );
+
+        while( pkt->i_buffer )
+        {
+            size_t i_write = __MIN( pkt->i_buffer, i_chunk_size );
+
+            block_cleanup_push( pkt );
+            if ( srt_sendmsg2( p_sys->sock, (char *)pkt->p_buffer, i_write, 0 ) == SRT_ERROR )
+                msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+            vlc_cleanup_pop();
+
+            pkt->p_buffer += i_write;
+            pkt->i_buffer -= i_write;
+        }
+
+        block_Release( pkt );
     }
+
+    return NULL;
 }
 
 static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
@@ -84,60 +96,16 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
     sout_access_out_sys_t *p_sys = p_access->p_sys;
     int i_len = 0;
 
-    vlc_interrupt_register( srt_wait_interrupted, p_access);
-
     while( p_buffer )
     {
-        block_t *p_next;
-
+        block_t *p_next = p_buffer->p_next;
         i_len += p_buffer->i_buffer;
 
-        while( p_buffer->i_buffer )
-        {
-            size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_chunk_size );
-            SRTSOCKET ready[2];
-
-retry:
-            if ( srt_epoll_wait( p_sys->i_poll_id,
-                0, 0, ready, &(int){ 2 }, p_sys->i_poll_timeout,
-                &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
-            {
-                /* Assuming that timeout error is normal when SRT socket is connected. */
-                if ( srt_getlasterror( NULL ) == SRT_ETIMEOUT &&
-                     srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
-                {
-                    srt_clearlasterror();
-                    goto retry;
-                }
-
-                i_len = VLC_EGENERIC;
-                goto out;
-            }
-
-            bool cancel = 0;
-            int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
-            if ( ret > 0 && cancel )
-            {
-                msg_Dbg( p_access, "Cancelled running" );
-                i_len = 0;
-                goto out;
-            }
-
-            if ( srt_sendmsg2( p_sys->sock, (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
-                msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+        block_FifoPut( p_sys->fifo, p_buffer );
 
-            p_buffer->p_buffer += i_write;
-            p_buffer->i_buffer -= i_write;
-        }
-
-        p_next = p_buffer->p_next;
-        block_Release( p_buffer );
         p_buffer = p_next;
     }
 
-out:
-    vlc_interrupt_unregister();
-    if ( i_len <= 0 ) block_ChainRelease( p_buffer );
     return i_len;
 }
 
@@ -170,6 +138,7 @@ static int Open( vlc_object_t *p_this )
     int                      i_dst_port;
     int                      stat;
     char                    *psz_passphrase = NULL;
+    int                      i_latency;
 
     struct addrinfo hints = {
         .ai_socktype = SOCK_DGRAM,
@@ -184,14 +153,9 @@ static int Open( vlc_object_t *p_this )
         return VLC_ENOMEM;
     }
 
-    if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
+    if( !( p_sys = calloc (1, sizeof( *p_sys ) ) ) )
         return VLC_ENOMEM;
 
-    p_sys->i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
-    p_sys->i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
-    p_sys->i_latency = var_InheritInteger( p_access, "latency" );
-    p_sys->i_poll_id = -1;
-
     p_access->p_sys = p_sys;
 
     psz_passphrase = var_InheritString( p_access, "passphrase" );
@@ -235,8 +199,8 @@ static int Open( vlc_object_t *p_this )
         goto failed;
     }
 
-    /* Make SRT non-blocking */
-    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
+    /* Make SRT blocking */
+    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { true }, sizeof( bool ) );
 
     /* Make sure TSBPD mode is enable (SRT mode) */
     srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
@@ -245,7 +209,8 @@ static int Open( vlc_object_t *p_this )
     srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof( int ) );
 
     /* Set latency */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
+    i_latency = var_InheritInteger( p_access, "latency" );
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &i_latency, sizeof( int ) );
 
     if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
     {
@@ -256,27 +221,8 @@ static int Open( vlc_object_t *p_this )
             &i_key_length, sizeof( int ) );
     }
 
-    p_sys->i_poll_id = srt_epoll_create();
-    if ( p_sys->i_poll_id == -1 )
-    {
-        msg_Err( p_access, "Failed to create poll id for SRT socket (reason: %s)",
-                 srt_getlasterror_str() );
-
-        goto failed;
-    }
-
-    if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
-    {
-        msg_Err( p_access, "Failed to create pipe fds." );
-        goto failed;
-    }
-    fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
-
-    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_OUT });
     srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
 
-    srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
-
     stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
     if ( stat == SRT_ERROR )
     {
@@ -288,6 +234,19 @@ static int Open( vlc_object_t *p_this )
     p_access->pf_write = Write;
     p_access->pf_control = Control;
 
+    p_sys->fifo = block_FifoNew();
+    if ( !p_sys->fifo )
+    {
+        msg_Err( p_access, "Failed to allocate block fifo." );
+        goto failed;
+    }
+
+    if ( vlc_clone( &p_sys->thread, Thread, p_access, VLC_THREAD_PRIORITY_HIGHEST ) )
+    {
+        msg_Err( p_access, "Failed to create thread." );
+        goto failed;
+    }
+
     free( psz_passphrase );
     free( psz_dst_addr );
     freeaddrinfo( res );
@@ -295,6 +254,9 @@ static int Open( vlc_object_t *p_this )
     return VLC_SUCCESS;
 
 failed:
+    if ( p_sys->fifo )
+        block_FifoRelease( p_sys->fifo );
+
     free( psz_passphrase );
 
     if ( psz_dst_addr != NULL)
@@ -303,12 +265,8 @@ failed:
     if ( res != NULL )
         freeaddrinfo( res );
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
-
     if ( p_sys != NULL )
     {
-        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
         if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
 
         free( p_sys );
@@ -322,11 +280,12 @@ static void Close( vlc_object_t * p_this )
     sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
     sout_access_out_sys_t *p_sys = p_access->p_sys;
 
-    srt_epoll_release( p_sys->i_poll_id );
-    srt_close( p_sys->sock );
+    vlc_cancel( p_sys->thread );
+    vlc_join( p_sys->thread, NULL );
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
+    block_FifoRelease( p_sys->fifo );
+
+    srt_close( p_sys->sock );
 
     free( p_sys );
 }
@@ -340,8 +299,6 @@ vlc_module_begin()
 
     add_integer( "chunk-size", SRT_DEFAULT_CHUNK_SIZE,
             N_("SRT chunk size (bytes)"), NULL, true )
-    add_integer( "poll-timeout", SRT_DEFAULT_POLL_TIMEOUT,
-            N_("Return poll wait after timeout milliseconds (-1 = infinite)"), NULL, true )
     add_integer( "latency", SRT_DEFAULT_LATENCY, N_("SRT latency (ms)"), NULL, true )
     add_password( "passphrase", "", N_("Password for stream encryption"), NULL, false )
     add_integer( "key-length", SRT_DEFAULT_KEY_LENGTH,
-- 
2.15.1



More information about the vlc-devel mailing list