[vlc-devel] [PATCH 1/2] access: srt: cancellable by block_cleanup_push in thread

Justin Kim justin.kim at collabora.com
Thu Jan 11 09:46:41 CET 2018


To support Win32, this patch removes platform dependent
APIs. Although SRT provides poll-like mechanism, it doesn't
support Win32 too. Therefore, in this patch, srt module
receives packets in blocking-mode in a thread which is cancellable
by clean-up handler of VLC.

Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
 modules/access/srt.c | 171 +++++++++++++++++++++------------------------------
 1 file changed, 69 insertions(+), 102 deletions(-)

diff --git a/modules/access/srt.c b/modules/access/srt.c
index bf770a42a5..b845157d46 100644
--- a/modules/access/srt.c
+++ b/modules/access/srt.c
@@ -41,8 +41,6 @@
 /* libsrt defines default packet size as 1316 internally
  * so srt module takes same value. */
 #define SRT_DEFAULT_CHUNK_SIZE 1316
-/* The default timeout is -1 (infinite) */
-#define SRT_DEFAULT_POLL_TIMEOUT -1
 /* The default latency is 125
  * which uses srt library internally */
 #define SRT_DEFAULT_LATENCY 125
@@ -60,22 +58,50 @@ static const char *const srt_key_length_names[] = {
 struct stream_sys_t
 {
     SRTSOCKET   sock;
-    int         i_poll_id;
-    int         i_poll_timeout;
-    int         i_latency;
-    size_t      i_chunk_size;
-    int         i_pipe_fds[2];
+    vlc_thread_t thread;
+    bool        b_woken;
+    block_fifo_t *fifo;
+
 };
 
-static void srt_wait_interrupted(void *p_data)
+static void *Thread(void *p_data)
 {
     stream_t *p_stream = p_data;
     stream_sys_t *p_sys = p_stream->p_sys;
-    msg_Dbg( p_stream, "Waking up srt_epoll_wait");
-    if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+
+    size_t i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
+
+    for(;;)
     {
-        msg_Err( p_stream, "Failed to send data to pipe");
+        int stat;
+        block_t *pkt = block_Alloc( i_chunk_size );
+
+        if ( unlikely( pkt == NULL ) )
+        {
+            break;
+        }
+
+        block_cleanup_push( pkt );
+        stat = srt_recvmsg( p_sys->sock, (char *)pkt->p_buffer, i_chunk_size );
+        vlc_cleanup_pop();
+
+        if ( stat == SRT_ERROR )
+        {
+            msg_Err( p_stream, "failed to recevie SRT packet (reason: %s)", srt_getlasterror_str() );
+            block_Release( pkt );
+            break;
+        }
+
+        pkt->i_buffer = stat;
+        block_FifoPut( p_sys->fifo, pkt );
     }
+
+    vlc_fifo_Lock( p_sys->fifo );
+    p_sys->b_woken = true;
+    vlc_fifo_Signal( p_sys->fifo );
+    vlc_fifo_Unlock( p_sys->fifo );
+
+    return NULL;
 }
 
 static int Control(stream_t *p_stream, int i_query, va_list args)
@@ -105,63 +131,22 @@ static int Control(stream_t *p_stream, int i_query, va_list args)
 static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof)
 {
     stream_sys_t *p_sys = p_stream->p_sys;
+    block_t *pkt;
 
-    block_t *pkt = block_Alloc( p_sys->i_chunk_size );
-
-    if ( unlikely( pkt == NULL ) )
-    {
-        return NULL;
-    }
-
-    vlc_interrupt_register( srt_wait_interrupted, p_stream);
-
-    SRTSOCKET ready[2];
-
-    if ( srt_epoll_wait( p_sys->i_poll_id,
-        ready, &(int) { 2 }, NULL, 0, p_sys->i_poll_timeout,
-        &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
-    {
-        int srt_err = srt_getlasterror( NULL );
-
-        /* Assuming that timeout error is normal when SRT socket is connected. */
-        if ( srt_err == SRT_ETIMEOUT && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
-        {
-            goto skip;
-        }
-
-        msg_Err( p_stream, "released poll wait (reason : %s)", srt_getlasterror_str() );
-        goto endofstream;
-    }
+    vlc_fifo_Lock( p_sys->fifo );
 
-    bool cancel = 0;
-    int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
-    if ( ret > 0 && cancel )
-    {
-        msg_Dbg( p_stream, "Cancelled running" );
-        goto endofstream;
+    while( vlc_fifo_IsEmpty( p_sys->fifo ) ) {
+        if( p_sys->b_woken )
+            break;
+        vlc_fifo_Wait( p_sys->fifo );
     }
 
-    int stat = srt_recvmsg( p_sys->sock, (char *)pkt->p_buffer, p_sys->i_chunk_size );
+    if ( ( pkt = vlc_fifo_DequeueUnlocked( p_sys->fifo ) ) == NULL)
+        *eof = true;
+    p_sys->b_woken = false;
+    vlc_fifo_Unlock( p_sys->fifo );
 
-    if ( stat == SRT_ERROR )
-    {
-        msg_Err( p_stream, "failed to recevie SRT packet (reason: %s)", srt_getlasterror_str() );
-        goto endofstream;
-    }
-
-    pkt->i_buffer = stat;
-    vlc_interrupt_unregister();
     return pkt;
-
-endofstream:
-    msg_Dbg( p_stream, "EOS");
-   *eof = true;
-skip:
-    block_Release(pkt);
-    srt_clearlasterror();
-    vlc_interrupt_unregister();
-
-    return NULL;
 }
 
 static int Open(vlc_object_t *p_this)
@@ -175,8 +160,9 @@ static int Open(vlc_object_t *p_this)
     int stat;
 
     char         *psz_passphrase = NULL;
+    int           i_latency;
 
-    p_sys = vlc_obj_malloc( p_this, sizeof( *p_sys ) );
+    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
     if( unlikely( p_sys == NULL ) )
         return VLC_ENOMEM;
 
@@ -186,10 +172,6 @@ static int Open(vlc_object_t *p_this)
         goto failed;
     }
 
-    p_sys->i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
-    p_sys->i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" );
-    p_sys->i_latency = var_InheritInteger( p_stream, "latency" );
-    p_sys->i_poll_id = -1;
     p_stream->p_sys = p_sys;
     p_stream->pf_block = BlockSRT;
     p_stream->pf_control = Control;
@@ -214,14 +196,15 @@ static int Open(vlc_object_t *p_this)
         goto failed;
     }
 
-    /* Make SRT non-blocking */
-    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
+    /* Make SRT blocking */
+    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { true }, sizeof( bool ) );
 
     /* Make sure TSBPD mode is enable (SRT mode) */
     srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
 
     /* Set latency */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
+    i_latency = var_InheritInteger( p_stream, "latency" );
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &i_latency, sizeof( int ) );
 
     if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
     {
@@ -233,29 +216,24 @@ static int Open(vlc_object_t *p_this)
             &i_key_length, sizeof( int ) );
     }
 
-    p_sys->i_poll_id = srt_epoll_create();
-    if ( p_sys->i_poll_id == -1 )
+    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr) );
+
+    if ( stat == SRT_ERROR )
     {
-        msg_Err( p_stream, "Failed to create poll id for SRT socket." );
+        msg_Err( p_stream, "Failed to connect to server." );
         goto failed;
     }
 
-    if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
+    p_sys->fifo = block_FifoNew();
+    if ( !p_sys->fifo )
     {
-        msg_Err( p_stream, "Failed to create pipe fds." );
+        msg_Err( p_stream, "Failed to allocate block fifo." );
         goto failed;
     }
 
-    fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
-
-    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_IN } );
-    srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
-
-    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr) );
-
-    if ( stat == SRT_ERROR )
+    if ( vlc_clone( &p_sys->thread, Thread, p_stream, VLC_THREAD_PRIORITY_INPUT ) )
     {
-        msg_Err( p_stream, "Failed to connect to server." );
+        msg_Err( p_stream, "Failed to create thread." );
         goto failed;
     }
 
@@ -266,6 +244,8 @@ static int Open(vlc_object_t *p_this)
     return VLC_SUCCESS;
 
 failed:
+    if ( p_sys->fifo )
+        block_FifoRelease( p_sys->fifo );
 
     if ( parsed_url.psz_host != NULL
       && parsed_url.psz_buffer != NULL)
@@ -278,14 +258,6 @@ failed:
         freeaddrinfo( res );
     }
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
-
-    if ( p_sys->i_poll_id != -1 )
-    {
-        srt_epoll_release( p_sys->i_poll_id );
-        p_sys->i_poll_id = -1;
-    }
     srt_close( p_sys->sock );
 
     free (psz_passphrase);
@@ -298,16 +270,13 @@ static void Close(vlc_object_t *p_this)
     stream_t     *p_stream = (stream_t*)p_this;
     stream_sys_t *p_sys = p_stream->p_sys;
 
-    if ( p_sys->i_poll_id != -1 )
-    {
-        srt_epoll_release( p_sys->i_poll_id );
-        p_sys->i_poll_id = -1;
-    }
+    vlc_cancel( p_sys->thread );
+    vlc_join( p_sys->thread, NULL );
+
+    block_FifoRelease( p_sys->fifo );
+
     msg_Dbg( p_stream, "closing server" );
     srt_close( p_sys->sock );
-
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
 }
 
 /* Module descriptor */
@@ -319,8 +288,6 @@ vlc_module_begin ()
 
     add_integer( "chunk-size", SRT_DEFAULT_CHUNK_SIZE,
             N_("SRT chunk size (bytes)"), NULL, true )
-    add_integer( "poll-timeout", SRT_DEFAULT_POLL_TIMEOUT,
-            N_("Return poll wait after timeout milliseconds (-1 = infinite)"), NULL, true )
     add_integer( "latency", SRT_DEFAULT_LATENCY, N_("SRT latency (ms)"), NULL, true )
     add_password( "passphrase", "", N_("Password for stream encryption"), NULL, false )
     add_integer( "key-length", SRT_DEFAULT_KEY_LENGTH,
-- 
2.15.1



More information about the vlc-devel mailing list