[vlc-devel] [PATCH 2/3] access: srt: Refactoring to support connection recovery

Justin Kim justin.kim at collabora.com
Tue Jan 30 09:13:33 CET 2018


This is a refactoring code to support SRT connection recovery.
While doing this work, SRT modules support new features comparing
to the previous ones.

 - Connection Recovery
   When lost a SRT connection, this module can detect and try to re-connect.

 - Interruptible by SRT APIs
   'srt_epoll_wait' will be interrupted when all socket descriptors
   are removed from its montoring list. Fortunately, SRT modules are using
   only one socket.

 - Platform Independent
   Now, SRT modules no longer require to use a pipe. Therfore, from this version,
   SRT modules can support Win32 platforms.

Based on code from:
Roman Diouskine <rdiouskine at haivision.com>

Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
 modules/access/srt.c | 338 ++++++++++++++++++++++++++++++---------------------
 1 file changed, 201 insertions(+), 137 deletions(-)

diff --git a/modules/access/srt.c b/modules/access/srt.c
index bf770a42a5..410d15f749 100644
--- a/modules/access/srt.c
+++ b/modules/access/srt.c
@@ -1,9 +1,11 @@
 /*****************************************************************************
  * srt.c: SRT (Secure Reliable Transport) input module
  *****************************************************************************
- * Copyright (C) 2017, Collabora Ltd.
+ * Copyright (C) 2017-2018, Collabora Ltd.
+ * Copyright (C) 2018, Haivision Systems Inc.
  *
  * Authors: Justin Kim <justin.kim at collabora.com>
+ *          Roman Diouskine <rdiouskine at haivision.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published by
@@ -24,9 +26,6 @@
 # include "config.h"
 #endif
 
-#include <errno.h>
-#include <fcntl.h>
-
 #include <vlc_common.h>
 #include <vlc_interrupt.h>
 #include <vlc_fs.h>
@@ -61,21 +60,27 @@ struct stream_sys_t
 {
     SRTSOCKET   sock;
     int         i_poll_id;
-    int         i_poll_timeout;
-    int         i_latency;
-    size_t      i_chunk_size;
-    int         i_pipe_fds[2];
+    vlc_mutex_t lock;
+    bool        b_interrupted;
 };
 
 static void srt_wait_interrupted(void *p_data)
 {
     stream_t *p_stream = p_data;
     stream_sys_t *p_sys = p_stream->p_sys;
-    msg_Dbg( p_stream, "Waking up srt_epoll_wait");
-    if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+
+    vlc_mutex_lock( &p_sys->lock );
+    if ( p_sys->i_poll_id >= 0 &&  p_sys->sock != SRT_INVALID_SOCK )
     {
-        msg_Err( p_stream, "Failed to send data to pipe");
+        p_sys->b_interrupted = true;
+
+        msg_Dbg( p_stream, "Waking up srt_epoll_wait");
+
+        /* Removing all socket descriptors from the monitoring list
+         * wakes up SRT's threads. We only have one to remove. */
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
     }
+    vlc_mutex_unlock( &p_sys->lock );
 }
 
 static int Control(stream_t *p_stream, int i_query, va_list args)
@@ -102,97 +107,28 @@ static int Control(stream_t *p_stream, int i_query, va_list args)
     return i_ret;
 }
 
-static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof)
+static bool srt_schedule_reconnect(stream_t *p_stream)
 {
-    stream_sys_t *p_sys = p_stream->p_sys;
-
-    block_t *pkt = block_Alloc( p_sys->i_chunk_size );
-
-    if ( unlikely( pkt == NULL ) )
-    {
-        return NULL;
-    }
-
-    vlc_interrupt_register( srt_wait_interrupted, p_stream);
-
-    SRTSOCKET ready[2];
-
-    if ( srt_epoll_wait( p_sys->i_poll_id,
-        ready, &(int) { 2 }, NULL, 0, p_sys->i_poll_timeout,
-        &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
-    {
-        int srt_err = srt_getlasterror( NULL );
-
-        /* Assuming that timeout error is normal when SRT socket is connected. */
-        if ( srt_err == SRT_ETIMEOUT && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
-        {
-            goto skip;
-        }
-
-        msg_Err( p_stream, "released poll wait (reason : %s)", srt_getlasterror_str() );
-        goto endofstream;
-    }
-
-    bool cancel = 0;
-    int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
-    if ( ret > 0 && cancel )
-    {
-        msg_Dbg( p_stream, "Cancelled running" );
-        goto endofstream;
-    }
-
-    int stat = srt_recvmsg( p_sys->sock, (char *)pkt->p_buffer, p_sys->i_chunk_size );
-
-    if ( stat == SRT_ERROR )
-    {
-        msg_Err( p_stream, "failed to recevie SRT packet (reason: %s)", srt_getlasterror_str() );
-        goto endofstream;
-    }
-
-    pkt->i_buffer = stat;
-    vlc_interrupt_unregister();
-    return pkt;
-
-endofstream:
-    msg_Dbg( p_stream, "EOS");
-   *eof = true;
-skip:
-    block_Release(pkt);
-    srt_clearlasterror();
-    vlc_interrupt_unregister();
-
-    return NULL;
-}
+    int         i_latency;
+    int         stat;
+    char        *psz_passphrase = NULL;
+    vlc_url_t   parsed_url = { 0 };
 
-static int Open(vlc_object_t *p_this)
-{
-    stream_t     *p_stream = (stream_t*)p_this;
-    stream_sys_t *p_sys = NULL;
-    vlc_url_t     parsed_url = { 0 };
     struct addrinfo hints = {
         .ai_socktype = SOCK_DGRAM,
     }, *res = NULL;
-    int stat;
 
-    char         *psz_passphrase = NULL;
-
-    p_sys = vlc_obj_malloc( p_this, sizeof( *p_sys ) );
-    if( unlikely( p_sys == NULL ) )
-        return VLC_ENOMEM;
+    stream_sys_t *p_sys = p_stream->p_sys;
+    bool failed = false;
 
     if ( vlc_UrlParse( &parsed_url, p_stream->psz_url ) == -1 )
     {
-        msg_Err( p_stream, "Failed to parse a given URL (%s)", p_stream->psz_url );
-        goto failed;
-    }
+        msg_Err( p_stream, "Failed to parse input URL (%s)",
+            p_stream->psz_url );
 
-    p_sys->i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
-    p_sys->i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" );
-    p_sys->i_latency = var_InheritInteger( p_stream, "latency" );
-    p_sys->i_poll_id = -1;
-    p_stream->p_sys = p_sys;
-    p_stream->pf_block = BlockSRT;
-    p_stream->pf_control = Control;
+        failed = true;
+        goto out;
+    }
 
     psz_passphrase = var_InheritString( p_stream, "passphrase" );
 
@@ -204,91 +140,216 @@ static int Open(vlc_object_t *p_this)
                  parsed_url.i_port,
                  gai_strerror( stat ) );
 
-        goto failed;
+        failed = true;
+        goto out;
+    }
+
+    /* Always start with a fresh socket */
+    if (p_sys->sock != SRT_INVALID_SOCK)
+    {
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close(p_sys->sock);
     }
 
     p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
-    if ( p_sys->sock == SRT_ERROR )
+    if ( p_sys->sock == SRT_INVALID_SOCK )
     {
         msg_Err( p_stream, "Failed to open socket." );
-        goto failed;
+        failed = true;
+        goto out;
     }
 
     /* Make SRT non-blocking */
-    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
+    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
+        &(bool) { false }, sizeof( bool ) );
+    srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
+        &(bool) { false }, sizeof( bool ) );
 
     /* Make sure TSBPD mode is enable (SRT mode) */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
+        &(int) { 1 }, sizeof( int ) );
+
+    /* This is an access module so it is always a receiver */
+    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
+        &(int) { 0 }, sizeof( int ) );
 
     /* Set latency */
-    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
+    i_latency = var_InheritInteger( p_stream, "latency" );
+    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
+        &i_latency, sizeof( int ) );
 
     if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
     {
         int i_key_length = var_InheritInteger( p_stream, "key-length" );
-
         srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
             psz_passphrase, strlen( psz_passphrase ) );
         srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
             &i_key_length, sizeof( int ) );
     }
 
-    p_sys->i_poll_id = srt_epoll_create();
-    if ( p_sys->i_poll_id == -1 )
+    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+        &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN });
+
+    /* Schedule a connect */
+    msg_Dbg( p_stream, "Schedule SRT connect (dest addresss: %s, port: %d).",
+        parsed_url.psz_host, parsed_url.i_port);
+
+    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
+    if ( stat == SRT_ERROR )
     {
-        msg_Err( p_stream, "Failed to create poll id for SRT socket." );
-        goto failed;
+        msg_Err( p_stream, "Failed to connect to server (reason: %s)",
+                 srt_getlasterror_str() );
+        failed = true;
     }
 
-    if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
+out:
+    if (failed && p_sys->sock != SRT_INVALID_SOCK)
     {
-        msg_Err( p_stream, "Failed to create pipe fds." );
-        goto failed;
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close(p_sys->sock);
+        p_sys->sock = SRT_INVALID_SOCK;
     }
 
-    fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
+    vlc_UrlClean( &parsed_url );
+    freeaddrinfo( res );
+    free( psz_passphrase );
 
-    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_IN } );
-    srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
+    return !failed;
+}
 
-    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr) );
+static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof)
+{
+    stream_sys_t *p_sys = p_stream->p_sys;
+    int i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
+    int i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" );
 
-    if ( stat == SRT_ERROR )
+    if ( vlc_killed() )
     {
-        msg_Err( p_stream, "Failed to connect to server." );
-        goto failed;
+        /* We are told to stop. Stop. */
+        return NULL;
     }
 
-    vlc_UrlClean( &parsed_url );
-    freeaddrinfo( res );
-    free (psz_passphrase);
-
-    return VLC_SUCCESS;
+    block_t *pkt = block_Alloc( i_chunk_size );
+    if ( unlikely( pkt == NULL ) )
+    {
+        return NULL;
+    }
 
-failed:
+    vlc_interrupt_register( srt_wait_interrupted, p_stream);
 
-    if ( parsed_url.psz_host != NULL
-      && parsed_url.psz_buffer != NULL)
+    SRTSOCKET ready[1];
+    int readycnt = 1;
+    while ( srt_epoll_wait( p_sys->i_poll_id,
+        ready, &readycnt, 0, 0,
+        i_poll_timeout, NULL, 0, NULL, 0 ) >= 0)
     {
-        vlc_UrlClean( &parsed_url );
+        if ( readycnt < 0  || ready[0] != p_sys->sock )
+        {
+            /* should never happen, force recovery */
+            srt_close(p_sys->sock);
+            p_sys->sock = SRT_INVALID_SOCK;
+        }
+
+        switch( srt_getsockstate( p_sys->sock ) )
+        {
+            case SRTS_CONNECTED:
+                /* Good to go */
+                break;
+            case SRTS_BROKEN:
+            case SRTS_NONEXIST:
+            case SRTS_CLOSED:
+                /* Failed. Schedule recovery. */
+                if ( !srt_schedule_reconnect( p_stream ) )
+                    msg_Err( p_stream, "Failed to schedule connect" );
+                /* Fall-through */
+            default:
+                /* Not ready */
+                continue;
+        }
+
+        int stat = srt_recvmsg( p_sys->sock,
+            (char *)pkt->p_buffer, i_chunk_size );
+        if ( stat > 0 )
+        {
+            pkt->i_buffer = stat;
+            goto out;
+        }
+
+        msg_Err( p_stream, "failed to receive packet, set EOS (reason: %s)",
+            srt_getlasterror_str() );
+        *eof = true;
+        break;
     }
 
-    if ( res != NULL )
+    /* if the poll reports errors for any reason at all
+     * including a timeout, or there is a read error,
+     * we skip the turn.
+     */
+
+    block_Release(pkt);
+    pkt = NULL;
+
+out:
+    vlc_interrupt_unregister();
+
+    /* Re-add the socket to the poll if we were interrupted */
+    vlc_mutex_lock( &p_sys->lock );
+    if ( p_sys->b_interrupted )
     {
-        freeaddrinfo( res );
+        srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+            &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN } );
+        p_sys->b_interrupted = false;
     }
+    vlc_mutex_unlock( &p_sys->lock );
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
+    return pkt;
+}
 
-    if ( p_sys->i_poll_id != -1 )
+static int Open(vlc_object_t *p_this)
+{
+    stream_t     *p_stream = (stream_t*)p_this;
+    stream_sys_t *p_sys = NULL;
+
+    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+    if( unlikely( p_sys == NULL ) )
+        return VLC_ENOMEM;
+
+    srt_startup();
+
+    vlc_mutex_init( &p_sys->lock );
+
+    p_stream->p_sys = p_sys;
+
+    p_sys->i_poll_id = srt_epoll_create();
+    if ( p_sys->i_poll_id == -1 )
     {
-        srt_epoll_release( p_sys->i_poll_id );
-        p_sys->i_poll_id = -1;
+        msg_Err( p_stream, "Failed to create poll id for SRT socket." );
+        goto failed;
     }
-    srt_close( p_sys->sock );
 
-    free (psz_passphrase);
+    if ( !srt_schedule_reconnect( p_stream ) )
+    {
+        msg_Err( p_stream, "Failed to schedule connect");
+
+        goto failed;
+    }
+
+    p_stream->pf_block = BlockSRT;
+    p_stream->pf_control = Control;
+
+    return VLC_SUCCESS;
+
+failed:
+    vlc_mutex_destroy( &p_sys->lock );
+
+    if ( p_sys != NULL )
+    {
+        if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
+        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
+
+        vlc_obj_free( p_this, p_sys );
+        p_stream->p_sys = NULL;
+    }
 
     return VLC_EGENERIC;
 }
@@ -298,16 +359,19 @@ static void Close(vlc_object_t *p_this)
     stream_t     *p_stream = (stream_t*)p_this;
     stream_sys_t *p_sys = p_stream->p_sys;
 
-    if ( p_sys->i_poll_id != -1 )
+    if ( p_sys )
     {
+        vlc_mutex_destroy( &p_sys->lock );
+
+        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+        srt_close( p_sys->sock );
         srt_epoll_release( p_sys->i_poll_id );
-        p_sys->i_poll_id = -1;
+
+        vlc_obj_free( p_this, p_sys );
+        p_stream->p_sys = NULL;
     }
-    msg_Dbg( p_stream, "closing server" );
-    srt_close( p_sys->sock );
 
-    vlc_close( p_sys->i_pipe_fds[0] );
-    vlc_close( p_sys->i_pipe_fds[1] );
+    srt_cleanup();
 }
 
 /* Module descriptor */
-- 
2.16.1



More information about the vlc-devel mailing list