[vlc-devel] [PATCH v5 1/3] access_out: srt: Refactoring to support connection recovery

Justin Kim justin.kim at collabora.com
Wed Mar 28 12:29:44 CEST 2018


This is a refactoring code to support SRT connection recovery.
While doing this work, SRT modules support new features comparing
to the previous ones.

 - Connection Recovery
   When lost a SRT connection, this module can detect and try to re-connect.

 - Interruptible by SRT APIs
   'srt_epoll_wait' will be interrupted when all socket descriptors
   are removed from its montoring list. Fortunately, SRT modules are using
   only one socket.

 - Platform Independent
   Now, SRT modules no longer require to use a pipe. Therfore, from this version,
   SRT modules can support Win32 platforms.

Based on code from:
Roman Diouskine <rdiouskine at haivision.com>

Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
 modules/access_output/Makefile.am |   1 +
 modules/access_output/srt.c       | 396 ++++++++++++++++++++++++--------------
 2 files changed, 249 insertions(+), 148 deletions(-)

diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
index bc954ee03b..26560f400c 100644
--- a/modules/access_output/Makefile.am
+++ b/modules/access_output/Makefile.am
@@ -29,6 +29,7 @@ EXTRA_LTLIBRARIES += libaccess_output_shout_plugin.la
 
 ### SRT ###
 libaccess_output_srt_plugin_la_SOURCES = access_output/srt.c
+libaccess_output_srt_plugin_la_CFLAGS = $(AM_CFLAGS) $(SRT_CFLAGS)
 libaccess_output_srt_plugin_la_CPPFLAGS = $(AM_CPPFLAGS) $(SRT_CPPFLAGS)
 libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS) $(LIBPTHREAD)
 libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
diff --git a/modules/access_output/srt.c b/modules/access_output/srt.c
index 904d8f56d0..f76bf52302 100644
--- a/modules/access_output/srt.c
+++ b/modules/access_output/srt.c
@@ -1,9 +1,11 @@
 /*****************************************************************************
  * srt.c: SRT (Secure Reliable Transport) output module
  *****************************************************************************
- * Copyright (C) 2017, Collabora Ltd.
+ * Copyright (C) 2017-2018, Collabora Ltd.
+ * Copyright (C) 2018, Haivision Systems Inc.
  *
  * Authors: Justin Kim <justin.kim at collabora.com>
+ *          Roman Diouskine <rdiouskine at haivision.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published by
@@ -24,9 +26,6 @@
 # include "config.h"
 #endif
 
-#include <errno.h>
-#include <fcntl.h>
-
 #include <vlc_common.h>
 #include <vlc_interrupt.h>
 #include <vlc_fs.h>
@@ -62,27 +61,158 @@ struct sout_access_out_sys_t
 {
     SRTSOCKET     sock;
     int           i_poll_id;
-    int           i_poll_timeout;
-    int           i_latency;
-    size_t        i_chunk_size;
-    int           i_pipe_fds[2];
+    bool          b_interrupted;
+    vlc_mutex_t   lock;
 };
 
 static void srt_wait_interrupted(void *p_data)
 {
     sout_access_out_t *p_access = p_data;
     sout_access_out_sys_t *p_sys = p_access->p_sys;
-    msg_Dbg( p_access, "Waking up srt_epoll_wait");
-    if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+
+    vlc_mutex_lock( &p_sys->lock );
+    if ( p_sys->i_poll_id >= 0 &&  p_sys->sock != SRT_INVALID_SOCK )
     {
-        msg_Err( p_access, "Failed to send data to event fd");
+        p_sys->b_interrupted = true;
+
+        msg_Dbg( p_access, "Waking up srt_epoll_wait");
+
+        /* Removing all socket descriptors from the monitoring list
+         * wakes up SRT's threads. We only have one to remove. */
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
     }
+    vlc_mutex_unlock( &p_sys->lock );
+}
+
+static bool srt_schedule_reconnect(sout_access_out_t *p_access)
+{
+    char                    *psz_dst_addr = NULL;
+    int                      i_dst_port;
+    int                      i_latency;
+    int                      stat;
+    char                    *psz_passphrase = NULL;
+
+    struct addrinfo hints = {
+        .ai_socktype = SOCK_DGRAM,
+    }, *res = NULL;
+
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    bool failed = false;
+
+    psz_passphrase = var_InheritString( p_access, "passphrase" );
+
+    i_dst_port = SRT_DEFAULT_PORT;
+    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
+    if( !psz_dst_addr )
+    {
+        failed = true;
+        goto out;
+    }
+
+    if ( psz_parser[0] == '[' )
+        psz_parser = strchr( psz_parser, ']' );
+
+    psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
+    if ( psz_parser != NULL )
+    {
+        *psz_parser++ = '\0';
+        i_dst_port = atoi( psz_parser );
+    }
+
+    stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
+    if ( stat )
+    {
+        msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
+                 psz_dst_addr,
+                 i_dst_port,
+                 gai_strerror( stat ) );
+
+        failed = true;
+        goto out;
+    }
+
+    /* Always start with a fresh socket */
+    if ( p_sys->sock != SRT_INVALID_SOCK )
+    {
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close( p_sys->sock );
+    }
+
+    p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
+    if ( p_sys->sock == SRT_INVALID_SOCK )
+    {
+        msg_Err( p_access, "Failed to open socket." );
+        failed = true;
+        goto out;
+    }
+
+    /* Make SRT non-blocking */
+    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
+        &(bool) { false }, sizeof( bool ) );
+    srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
+        &(bool) { false }, sizeof( bool ) );
+
+    /* Make sure TSBPD mode is enable (SRT mode) */
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
+        &(int) { 1 }, sizeof( int ) );
+
+    /* This is an access_out so it is always a sender */
+    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
+        &(int) { 1 }, sizeof( int ) );
+
+    /* Set latency */
+    i_latency = var_InheritInteger( p_access, "latency" );
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
+        &i_latency, sizeof( int ) );
+
+    if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
+    {
+        int i_key_length = var_InheritInteger( p_access, "key-length" );
+        srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
+            psz_passphrase, strlen( psz_passphrase ) );
+        srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
+            &i_key_length, sizeof( int ) );
+    }
+
+    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
+
+    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+        &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
+
+    /* Schedule a connect */
+    msg_Dbg( p_access, "Schedule SRT connect (dest addresss: %s, port: %d).",
+        psz_dst_addr, i_dst_port );
+
+    stat = srt_connect( p_sys->sock, res->ai_addr, res->ai_addrlen );
+    if ( stat == SRT_ERROR )
+    {
+        msg_Err( p_access, "Failed to connect to server (reason: %s)",
+                 srt_getlasterror_str() );
+        failed = true;
+    }
+
+out:
+    if (failed && p_sys->sock != SRT_INVALID_SOCK)
+    {
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close(p_sys->sock);
+        p_sys->sock = SRT_INVALID_SOCK;
+    }
+
+    free( psz_passphrase );
+    free( psz_dst_addr );
+    freeaddrinfo( res );
+
+    return !failed;
 }
 
 static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
 {
     sout_access_out_sys_t *p_sys = p_access->p_sys;
     int i_len = 0;
+    size_t i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
+    int i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
+    bool b_interrupted = false;
 
     vlc_interrupt_register( srt_wait_interrupted, p_access);
 
@@ -94,56 +224,116 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
 
         while( p_buffer->i_buffer )
         {
-            size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_chunk_size );
-            SRTSOCKET ready[2];
+            if ( vlc_killed() )
+            {
+                /* We are told to stop. Stop. */
+                i_len = VLC_EGENERIC;
+                goto out;
+            }
+
+            switch( srt_getsockstate( p_sys->sock ) )
+            {
+                case SRTS_CONNECTED:
+                    /* Good to go */
+                    break;
+                case SRTS_BROKEN:
+                case SRTS_NONEXIST:
+                case SRTS_CLOSED:
+                    /* Failed. Schedule recovery. */
+                    if ( !srt_schedule_reconnect( p_access ) )
+                        msg_Err( p_access, "Failed to schedule connect");
+                    /* Fall-through */
+                default:
+                    /* Not ready */
+                    i_len = VLC_EGENERIC;
+                    goto out;
+            }
 
-retry:
+            SRTSOCKET ready[1];
+            int readycnt = 1;
             if ( srt_epoll_wait( p_sys->i_poll_id,
-                0, 0, ready, &(int){ 2 }, p_sys->i_poll_timeout,
-                &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
+                0, 0, &ready[0], &readycnt,
+                i_poll_timeout, NULL, 0, NULL, 0 ) < 0)
             {
-                /* Assuming that timeout error is normal when SRT socket is connected. */
-                if ( srt_getlasterror( NULL ) == SRT_ETIMEOUT &&
-                     srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
+                if ( vlc_killed() )
                 {
-                    srt_clearlasterror();
-                    goto retry;
+                    /* We are told to stop. Stop. */
+                    i_len = VLC_EGENERIC;
+                    goto out;
                 }
 
-                i_len = VLC_EGENERIC;
-                goto out;
-            }
+                /* if 'srt_epoll_wait' is interrupted, we still need to
+                *  finish sending current block or it may be sent only
+                *  partially. TODO: this delay can be prevented,
+                *  possibly with a FIFO and an additional thread.
+                */
+                vlc_mutex_lock( &p_sys->lock );
+                if ( p_sys->b_interrupted )
+                {
+                    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+                        &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
+                    p_sys->b_interrupted = false;
+                    b_interrupted = true;
+                }
+                vlc_mutex_unlock( &p_sys->lock );
 
-            bool cancel = 0;
-            int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
-            if ( ret > 0 && cancel )
-            {
-                msg_Dbg( p_access, "Cancelled running" );
-                i_len = 0;
-                goto out;
+                if ( !b_interrupted )
+                {
+                    continue;
+                }
+                else if ( (true) )
+                {
+                    msg_Dbg( p_access, "srt_epoll_wait was interrupted");
+                }
             }
 
-            if ( srt_sendmsg2( p_sys->sock, (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
-                msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+            if ( readycnt > 0  && ready[0] == p_sys->sock
+                && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED)
+            {
+                size_t i_write = __MIN( p_buffer->i_buffer, i_chunk_size );
+                if (srt_sendmsg2( p_sys->sock,
+                    (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
+                {
+                    msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+                    i_len = VLC_EGENERIC;
+                    goto out;
+                }
 
-            p_buffer->p_buffer += i_write;
-            p_buffer->i_buffer -= i_write;
+                p_buffer->p_buffer += i_write;
+                p_buffer->i_buffer -= i_write;
+            }
         }
 
         p_next = p_buffer->p_next;
         block_Release( p_buffer );
         p_buffer = p_next;
+
+        if ( b_interrupted )
+        {
+            goto out;
+        }
     }
 
 out:
     vlc_interrupt_unregister();
+
+    /* Re-add the socket to the poll if we were interrupted */
+    vlc_mutex_lock( &p_sys->lock );
+    if ( p_sys->b_interrupted )
+    {
+        srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+            &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT } );
+        p_sys->b_interrupted = false;
+    }
+    vlc_mutex_unlock( &p_sys->lock );
+
     if ( i_len <= 0 ) block_ChainRelease( p_buffer );
     return i_len;
 }
 
 static int Control( sout_access_out_t *p_access, int i_query, va_list args )
 {
-    VLC_UNUSED (p_access);
+    VLC_UNUSED( p_access );
 
     int i_ret = VLC_SUCCESS;
 
@@ -166,15 +356,6 @@ static int Open( vlc_object_t *p_this )
     sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
     sout_access_out_sys_t   *p_sys = NULL;
 
-    char                    *psz_dst_addr = NULL;
-    int                      i_dst_port;
-    int                      stat;
-    char                    *psz_passphrase = NULL;
-
-    struct addrinfo hints = {
-        .ai_socktype = SOCK_DGRAM,
-    }, *res = NULL;
-
     if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
      || var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
      || var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
@@ -184,77 +365,15 @@ static int Open( vlc_object_t *p_this )
         return VLC_ENOMEM;
     }
 
-    if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
+    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+    if( unlikely( p_sys == NULL ) )
         return VLC_ENOMEM;
 
-    p_sys->i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
-    p_sys->i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
-    p_sys->i_latency = var_InheritInteger( p_access, "latency" );
-    p_sys->i_poll_id = -1;
+    srt_startup();
 
-    p_access->p_sys = p_sys;
-
-    psz_passphrase = var_InheritString( p_access, "passphrase" );
-
-    i_dst_port = SRT_DEFAULT_PORT;
-    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
-    if( !psz_dst_addr )
-    {
-        free( p_sys );
-        return VLC_ENOMEM;
-    }
-
-    if (psz_parser[0] == '[')
-        psz_parser = strchr (psz_parser, ']');
-
-    psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
-    if (psz_parser != NULL)
-    {
-        *psz_parser++ = '\0';
-        i_dst_port = atoi (psz_parser);
-    }
-
-    msg_Dbg( p_access, "Setting SRT socket (dest addresss: %s, port: %d).",
-             psz_dst_addr, i_dst_port );
-
-    stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
-    if ( stat )
-    {
-        msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
-                 psz_dst_addr,
-                 i_dst_port,
-                 gai_strerror( stat ) );
+    vlc_mutex_init( &p_sys->lock );
 
-        goto failed;
-    }
-
-    p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
-    if ( p_sys->sock == SRT_ERROR )
-    {
-        msg_Err( p_access, "Failed to open socket." );
-        goto failed;
-    }
-
-    /* Make SRT non-blocking */
-    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
-
-    /* Make sure TSBPD mode is enable (SRT mode) */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
-
-    /* This is an access_out so it is always a sender */
-    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof( int ) );
-
-    /* Set latency */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
-
-    if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
-    {
-        int i_key_length = var_InheritInteger( p_access, "key-length" );
-        srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
-            psz_passphrase, strlen( psz_passphrase ) );
-        srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
-            &i_key_length, sizeof( int ) );
-    }
+    p_access->p_sys = p_sys;
 
     p_sys->i_poll_id = srt_epoll_create();
     if ( p_sys->i_poll_id == -1 )
@@ -265,53 +384,28 @@ static int Open( vlc_object_t *p_this )
         goto failed;
     }
 
-    if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
+    if ( !srt_schedule_reconnect( p_access ) )
     {
-        msg_Err( p_access, "Failed to create pipe fds." );
-        goto failed;
-    }
-    fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
-
-    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_OUT });
-    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
+        msg_Err( p_access, "Failed to schedule connect");
 
-    srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
-
-    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
-    if ( stat == SRT_ERROR )
-    {
-        msg_Err( p_access, "Failed to connect to server (reason: %s)",
-                 srt_getlasterror_str() );
         goto failed;
     }
 
     p_access->pf_write = Write;
     p_access->pf_control = Control;
 
-    free( psz_passphrase );
-    free( psz_dst_addr );
-    freeaddrinfo( res );
-
     return VLC_SUCCESS;
 
 failed:
-    free( psz_passphrase );
-
-    if ( psz_dst_addr != NULL)
-        free( psz_dst_addr );
-
-    if ( res != NULL )
-        freeaddrinfo( res );
-
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
+    vlc_mutex_destroy( &p_sys->lock );
 
     if ( p_sys != NULL )
     {
-        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
         if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
+        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
 
-        free( p_sys );
+        vlc_obj_free( p_this, p_sys );
+        p_access->p_sys = NULL;
     }
 
     return VLC_EGENERIC;
@@ -322,13 +416,19 @@ static void Close( vlc_object_t * p_this )
     sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
     sout_access_out_sys_t *p_sys = p_access->p_sys;
 
-    srt_epoll_release( p_sys->i_poll_id );
-    srt_close( p_sys->sock );
+    if ( p_sys )
+    {
+        vlc_mutex_destroy( &p_sys->lock );
+
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close( p_sys->sock );
+        srt_epoll_release( p_sys->i_poll_id );
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
+        vlc_obj_free( p_this, p_sys );
+        p_access->p_sys = NULL;
+    }
 
-    free( p_sys );
+    srt_cleanup();
 }
 
 /* Module descriptor */
-- 
2.16.3



More information about the vlc-devel mailing list