[vlc-devel] [vlc-commits] access_out: srt: refactor to support connection recovery

Steve Lhomme robux4 at ycbcr.xyz
Tue Apr 3 10:21:47 CEST 2018


Can this be backported to 3.0 ?

Same with ba758d7fa8a8d1e0de6c15dda11fd7d38fb24d84


Le 28/03/2018 à 14:00, Justin Kim a écrit :
> vlc | branch: master | Justin Kim <justin.kim at collabora.com> | Wed Mar 28 19:29:44 2018 +0900| [867f005c508b1d38058d38761e5ca9273bed3a82] | committer: Thomas Guillem
>
> access_out: srt: refactor to support connection recovery
>
> This is a refactoring code to support SRT connection recovery.
> While doing this work, SRT modules support new features comparing
> to the previous ones.
>
>   - Connection Recovery
>     When lost a SRT connection, this module can detect and try to re-connect.
>
>   - Interruptible by SRT APIs
>     'srt_epoll_wait' will be interrupted when all socket descriptors
>     are removed from its montoring list. Fortunately, SRT modules are using
>     only one socket.
>
>   - Platform Independent
>     Now, SRT modules no longer require to use a pipe. Therfore, from this version,
>     SRT modules can support Win32 platforms.
>
> Based on code from:
> Roman Diouskine <rdiouskine at haivision.com>
>
> Signed-off-by: Justin Kim <justin.kim at collabora.com>
> Signed-off-by: Thomas Guillem <thomas at gllm.fr>
>
>> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=867f005c508b1d38058d38761e5ca9273bed3a82
> ---
>
>   modules/access_output/Makefile.am |   1 +
>   modules/access_output/srt.c       | 396 ++++++++++++++++++++++++--------------
>   2 files changed, 249 insertions(+), 148 deletions(-)
>
> diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
> index bc954ee03b..26560f400c 100644
> --- a/modules/access_output/Makefile.am
> +++ b/modules/access_output/Makefile.am
> @@ -29,6 +29,7 @@ EXTRA_LTLIBRARIES += libaccess_output_shout_plugin.la
>   
>   ### SRT ###
>   libaccess_output_srt_plugin_la_SOURCES = access_output/srt.c
> +libaccess_output_srt_plugin_la_CFLAGS = $(AM_CFLAGS) $(SRT_CFLAGS)
>   libaccess_output_srt_plugin_la_CPPFLAGS = $(AM_CPPFLAGS) $(SRT_CPPFLAGS)
>   libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS) $(LIBPTHREAD)
>   libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
> diff --git a/modules/access_output/srt.c b/modules/access_output/srt.c
> index 904d8f56d0..f76bf52302 100644
> --- a/modules/access_output/srt.c
> +++ b/modules/access_output/srt.c
> @@ -1,9 +1,11 @@
>   /*****************************************************************************
>    * srt.c: SRT (Secure Reliable Transport) output module
>    *****************************************************************************
> - * Copyright (C) 2017, Collabora Ltd.
> + * Copyright (C) 2017-2018, Collabora Ltd.
> + * Copyright (C) 2018, Haivision Systems Inc.
>    *
>    * Authors: Justin Kim <justin.kim at collabora.com>
> + *          Roman Diouskine <rdiouskine at haivision.com>
>    *
>    * This program is free software; you can redistribute it and/or modify it
>    * under the terms of the GNU Lesser General Public License as published by
> @@ -24,9 +26,6 @@
>   # include "config.h"
>   #endif
>   
> -#include <errno.h>
> -#include <fcntl.h>
> -
>   #include <vlc_common.h>
>   #include <vlc_interrupt.h>
>   #include <vlc_fs.h>
> @@ -62,27 +61,158 @@ struct sout_access_out_sys_t
>   {
>       SRTSOCKET     sock;
>       int           i_poll_id;
> -    int           i_poll_timeout;
> -    int           i_latency;
> -    size_t        i_chunk_size;
> -    int           i_pipe_fds[2];
> +    bool          b_interrupted;
> +    vlc_mutex_t   lock;
>   };
>   
>   static void srt_wait_interrupted(void *p_data)
>   {
>       sout_access_out_t *p_access = p_data;
>       sout_access_out_sys_t *p_sys = p_access->p_sys;
> -    msg_Dbg( p_access, "Waking up srt_epoll_wait");
> -    if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
> +
> +    vlc_mutex_lock( &p_sys->lock );
> +    if ( p_sys->i_poll_id >= 0 &&  p_sys->sock != SRT_INVALID_SOCK )
>       {
> -        msg_Err( p_access, "Failed to send data to event fd");
> +        p_sys->b_interrupted = true;
> +
> +        msg_Dbg( p_access, "Waking up srt_epoll_wait");
> +
> +        /* Removing all socket descriptors from the monitoring list
> +         * wakes up SRT's threads. We only have one to remove. */
> +        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
>       }
> +    vlc_mutex_unlock( &p_sys->lock );
> +}
> +
> +static bool srt_schedule_reconnect(sout_access_out_t *p_access)
> +{
> +    char                    *psz_dst_addr = NULL;
> +    int                      i_dst_port;
> +    int                      i_latency;
> +    int                      stat;
> +    char                    *psz_passphrase = NULL;
> +
> +    struct addrinfo hints = {
> +        .ai_socktype = SOCK_DGRAM,
> +    }, *res = NULL;
> +
> +    sout_access_out_sys_t *p_sys = p_access->p_sys;
> +    bool failed = false;
> +
> +    psz_passphrase = var_InheritString( p_access, "passphrase" );
> +
> +    i_dst_port = SRT_DEFAULT_PORT;
> +    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
> +    if( !psz_dst_addr )
> +    {
> +        failed = true;
> +        goto out;
> +    }
> +
> +    if ( psz_parser[0] == '[' )
> +        psz_parser = strchr( psz_parser, ']' );
> +
> +    psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
> +    if ( psz_parser != NULL )
> +    {
> +        *psz_parser++ = '\0';
> +        i_dst_port = atoi( psz_parser );
> +    }
> +
> +    stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
> +    if ( stat )
> +    {
> +        msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
> +                 psz_dst_addr,
> +                 i_dst_port,
> +                 gai_strerror( stat ) );
> +
> +        failed = true;
> +        goto out;
> +    }
> +
> +    /* Always start with a fresh socket */
> +    if ( p_sys->sock != SRT_INVALID_SOCK )
> +    {
> +        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> +        srt_close( p_sys->sock );
> +    }
> +
> +    p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
> +    if ( p_sys->sock == SRT_INVALID_SOCK )
> +    {
> +        msg_Err( p_access, "Failed to open socket." );
> +        failed = true;
> +        goto out;
> +    }
> +
> +    /* Make SRT non-blocking */
> +    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
> +        &(bool) { false }, sizeof( bool ) );
> +    srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
> +        &(bool) { false }, sizeof( bool ) );
> +
> +    /* Make sure TSBPD mode is enable (SRT mode) */
> +    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
> +        &(int) { 1 }, sizeof( int ) );
> +
> +    /* This is an access_out so it is always a sender */
> +    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
> +        &(int) { 1 }, sizeof( int ) );
> +
> +    /* Set latency */
> +    i_latency = var_InheritInteger( p_access, "latency" );
> +    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
> +        &i_latency, sizeof( int ) );
> +
> +    if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
> +    {
> +        int i_key_length = var_InheritInteger( p_access, "key-length" );
> +        srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
> +            psz_passphrase, strlen( psz_passphrase ) );
> +        srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
> +            &i_key_length, sizeof( int ) );
> +    }
> +
> +    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
> +
> +    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> +        &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
> +
> +    /* Schedule a connect */
> +    msg_Dbg( p_access, "Schedule SRT connect (dest addresss: %s, port: %d).",
> +        psz_dst_addr, i_dst_port );
> +
> +    stat = srt_connect( p_sys->sock, res->ai_addr, res->ai_addrlen );
> +    if ( stat == SRT_ERROR )
> +    {
> +        msg_Err( p_access, "Failed to connect to server (reason: %s)",
> +                 srt_getlasterror_str() );
> +        failed = true;
> +    }
> +
> +out:
> +    if (failed && p_sys->sock != SRT_INVALID_SOCK)
> +    {
> +        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> +        srt_close(p_sys->sock);
> +        p_sys->sock = SRT_INVALID_SOCK;
> +    }
> +
> +    free( psz_passphrase );
> +    free( psz_dst_addr );
> +    freeaddrinfo( res );
> +
> +    return !failed;
>   }
>   
>   static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
>   {
>       sout_access_out_sys_t *p_sys = p_access->p_sys;
>       int i_len = 0;
> +    size_t i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
> +    int i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
> +    bool b_interrupted = false;
>   
>       vlc_interrupt_register( srt_wait_interrupted, p_access);
>   
> @@ -94,56 +224,116 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
>   
>           while( p_buffer->i_buffer )
>           {
> -            size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_chunk_size );
> -            SRTSOCKET ready[2];
> +            if ( vlc_killed() )
> +            {
> +                /* We are told to stop. Stop. */
> +                i_len = VLC_EGENERIC;
> +                goto out;
> +            }
> +
> +            switch( srt_getsockstate( p_sys->sock ) )
> +            {
> +                case SRTS_CONNECTED:
> +                    /* Good to go */
> +                    break;
> +                case SRTS_BROKEN:
> +                case SRTS_NONEXIST:
> +                case SRTS_CLOSED:
> +                    /* Failed. Schedule recovery. */
> +                    if ( !srt_schedule_reconnect( p_access ) )
> +                        msg_Err( p_access, "Failed to schedule connect");
> +                    /* Fall-through */
> +                default:
> +                    /* Not ready */
> +                    i_len = VLC_EGENERIC;
> +                    goto out;
> +            }
>   
> -retry:
> +            SRTSOCKET ready[1];
> +            int readycnt = 1;
>               if ( srt_epoll_wait( p_sys->i_poll_id,
> -                0, 0, ready, &(int){ 2 }, p_sys->i_poll_timeout,
> -                &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
> +                0, 0, &ready[0], &readycnt,
> +                i_poll_timeout, NULL, 0, NULL, 0 ) < 0)
>               {
> -                /* Assuming that timeout error is normal when SRT socket is connected. */
> -                if ( srt_getlasterror( NULL ) == SRT_ETIMEOUT &&
> -                     srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
> +                if ( vlc_killed() )
>                   {
> -                    srt_clearlasterror();
> -                    goto retry;
> +                    /* We are told to stop. Stop. */
> +                    i_len = VLC_EGENERIC;
> +                    goto out;
>                   }
>   
> -                i_len = VLC_EGENERIC;
> -                goto out;
> -            }
> +                /* if 'srt_epoll_wait' is interrupted, we still need to
> +                *  finish sending current block or it may be sent only
> +                *  partially. TODO: this delay can be prevented,
> +                *  possibly with a FIFO and an additional thread.
> +                */
> +                vlc_mutex_lock( &p_sys->lock );
> +                if ( p_sys->b_interrupted )
> +                {
> +                    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> +                        &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
> +                    p_sys->b_interrupted = false;
> +                    b_interrupted = true;
> +                }
> +                vlc_mutex_unlock( &p_sys->lock );
>   
> -            bool cancel = 0;
> -            int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
> -            if ( ret > 0 && cancel )
> -            {
> -                msg_Dbg( p_access, "Cancelled running" );
> -                i_len = 0;
> -                goto out;
> +                if ( !b_interrupted )
> +                {
> +                    continue;
> +                }
> +                else if ( (true) )
> +                {
> +                    msg_Dbg( p_access, "srt_epoll_wait was interrupted");
> +                }
>               }
>   
> -            if ( srt_sendmsg2( p_sys->sock, (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
> -                msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
> +            if ( readycnt > 0  && ready[0] == p_sys->sock
> +                && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED)
> +            {
> +                size_t i_write = __MIN( p_buffer->i_buffer, i_chunk_size );
> +                if (srt_sendmsg2( p_sys->sock,
> +                    (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
> +                {
> +                    msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
> +                    i_len = VLC_EGENERIC;
> +                    goto out;
> +                }
>   
> -            p_buffer->p_buffer += i_write;
> -            p_buffer->i_buffer -= i_write;
> +                p_buffer->p_buffer += i_write;
> +                p_buffer->i_buffer -= i_write;
> +            }
>           }
>   
>           p_next = p_buffer->p_next;
>           block_Release( p_buffer );
>           p_buffer = p_next;
> +
> +        if ( b_interrupted )
> +        {
> +            goto out;
> +        }
>       }
>   
>   out:
>       vlc_interrupt_unregister();
> +
> +    /* Re-add the socket to the poll if we were interrupted */
> +    vlc_mutex_lock( &p_sys->lock );
> +    if ( p_sys->b_interrupted )
> +    {
> +        srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> +            &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT } );
> +        p_sys->b_interrupted = false;
> +    }
> +    vlc_mutex_unlock( &p_sys->lock );
> +
>       if ( i_len <= 0 ) block_ChainRelease( p_buffer );
>       return i_len;
>   }
>   
>   static int Control( sout_access_out_t *p_access, int i_query, va_list args )
>   {
> -    VLC_UNUSED (p_access);
> +    VLC_UNUSED( p_access );
>   
>       int i_ret = VLC_SUCCESS;
>   
> @@ -166,15 +356,6 @@ static int Open( vlc_object_t *p_this )
>       sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
>       sout_access_out_sys_t   *p_sys = NULL;
>   
> -    char                    *psz_dst_addr = NULL;
> -    int                      i_dst_port;
> -    int                      stat;
> -    char                    *psz_passphrase = NULL;
> -
> -    struct addrinfo hints = {
> -        .ai_socktype = SOCK_DGRAM,
> -    }, *res = NULL;
> -
>       if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
>        || var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
>        || var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
> @@ -184,77 +365,15 @@ static int Open( vlc_object_t *p_this )
>           return VLC_ENOMEM;
>       }
>   
> -    if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
> +    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
> +    if( unlikely( p_sys == NULL ) )
>           return VLC_ENOMEM;
>   
> -    p_sys->i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
> -    p_sys->i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
> -    p_sys->i_latency = var_InheritInteger( p_access, "latency" );
> -    p_sys->i_poll_id = -1;
> +    srt_startup();
>   
> -    p_access->p_sys = p_sys;
> -
> -    psz_passphrase = var_InheritString( p_access, "passphrase" );
> -
> -    i_dst_port = SRT_DEFAULT_PORT;
> -    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
> -    if( !psz_dst_addr )
> -    {
> -        free( p_sys );
> -        return VLC_ENOMEM;
> -    }
> -
> -    if (psz_parser[0] == '[')
> -        psz_parser = strchr (psz_parser, ']');
> -
> -    psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
> -    if (psz_parser != NULL)
> -    {
> -        *psz_parser++ = '\0';
> -        i_dst_port = atoi (psz_parser);
> -    }
> -
> -    msg_Dbg( p_access, "Setting SRT socket (dest addresss: %s, port: %d).",
> -             psz_dst_addr, i_dst_port );
> -
> -    stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
> -    if ( stat )
> -    {
> -        msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
> -                 psz_dst_addr,
> -                 i_dst_port,
> -                 gai_strerror( stat ) );
> +    vlc_mutex_init( &p_sys->lock );
>   
> -        goto failed;
> -    }
> -
> -    p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
> -    if ( p_sys->sock == SRT_ERROR )
> -    {
> -        msg_Err( p_access, "Failed to open socket." );
> -        goto failed;
> -    }
> -
> -    /* Make SRT non-blocking */
> -    srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
> -
> -    /* Make sure TSBPD mode is enable (SRT mode) */
> -    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
> -
> -    /* This is an access_out so it is always a sender */
> -    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof( int ) );
> -
> -    /* Set latency */
> -    srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
> -
> -    if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
> -    {
> -        int i_key_length = var_InheritInteger( p_access, "key-length" );
> -        srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
> -            psz_passphrase, strlen( psz_passphrase ) );
> -        srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
> -            &i_key_length, sizeof( int ) );
> -    }
> +    p_access->p_sys = p_sys;
>   
>       p_sys->i_poll_id = srt_epoll_create();
>       if ( p_sys->i_poll_id == -1 )
> @@ -265,53 +384,28 @@ static int Open( vlc_object_t *p_this )
>           goto failed;
>       }
>   
> -    if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
> +    if ( !srt_schedule_reconnect( p_access ) )
>       {
> -        msg_Err( p_access, "Failed to create pipe fds." );
> -        goto failed;
> -    }
> -    fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
> -
> -    srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_OUT });
> -    srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
> +        msg_Err( p_access, "Failed to schedule connect");
>   
> -    srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
> -
> -    stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
> -    if ( stat == SRT_ERROR )
> -    {
> -        msg_Err( p_access, "Failed to connect to server (reason: %s)",
> -                 srt_getlasterror_str() );
>           goto failed;
>       }
>   
>       p_access->pf_write = Write;
>       p_access->pf_control = Control;
>   
> -    free( psz_passphrase );
> -    free( psz_dst_addr );
> -    freeaddrinfo( res );
> -
>       return VLC_SUCCESS;
>   
>   failed:
> -    free( psz_passphrase );
> -
> -    if ( psz_dst_addr != NULL)
> -        free( psz_dst_addr );
> -
> -    if ( res != NULL )
> -        freeaddrinfo( res );
> -
> -    vlc_close( p_sys->i_pipe_fds[0] );
> -    vlc_close( p_sys->i_pipe_fds[1] );
> +    vlc_mutex_destroy( &p_sys->lock );
>   
>       if ( p_sys != NULL )
>       {
> -        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
>           if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
> +        if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
>   
> -        free( p_sys );
> +        vlc_obj_free( p_this, p_sys );
> +        p_access->p_sys = NULL;
>       }
>   
>       return VLC_EGENERIC;
> @@ -322,13 +416,19 @@ static void Close( vlc_object_t * p_this )
>       sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
>       sout_access_out_sys_t *p_sys = p_access->p_sys;
>   
> -    srt_epoll_release( p_sys->i_poll_id );
> -    srt_close( p_sys->sock );
> +    if ( p_sys )
> +    {
> +        vlc_mutex_destroy( &p_sys->lock );
> +
> +        srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> +        srt_close( p_sys->sock );
> +        srt_epoll_release( p_sys->i_poll_id );
>   
> -    vlc_close( p_sys->i_pipe_fds[0] );
> -    vlc_close( p_sys->i_pipe_fds[1] );
> +        vlc_obj_free( p_this, p_sys );
> +        p_access->p_sys = NULL;
> +    }
>   
> -    free( p_sys );
> +    srt_cleanup();
>   }
>   
>   /* Module descriptor */
>
> _______________________________________________
> vlc-commits mailing list
> vlc-commits at videolan.org
> https://mailman.videolan.org/listinfo/vlc-commits



More information about the vlc-devel mailing list