[vlc-devel] [vlc-commits] access_out: srt: refactor to support connection recovery
Steve Lhomme
robux4 at ycbcr.xyz
Tue Apr 3 10:21:47 CEST 2018
Can this be backported to 3.0 ?
Same with ba758d7fa8a8d1e0de6c15dda11fd7d38fb24d84
Le 28/03/2018 à 14:00, Justin Kim a écrit :
> vlc | branch: master | Justin Kim <justin.kim at collabora.com> | Wed Mar 28 19:29:44 2018 +0900| [867f005c508b1d38058d38761e5ca9273bed3a82] | committer: Thomas Guillem
>
> access_out: srt: refactor to support connection recovery
>
> This is a refactoring code to support SRT connection recovery.
> While doing this work, SRT modules support new features comparing
> to the previous ones.
>
> - Connection Recovery
> When lost a SRT connection, this module can detect and try to re-connect.
>
> - Interruptible by SRT APIs
> 'srt_epoll_wait' will be interrupted when all socket descriptors
> are removed from its montoring list. Fortunately, SRT modules are using
> only one socket.
>
> - Platform Independent
> Now, SRT modules no longer require to use a pipe. Therfore, from this version,
> SRT modules can support Win32 platforms.
>
> Based on code from:
> Roman Diouskine <rdiouskine at haivision.com>
>
> Signed-off-by: Justin Kim <justin.kim at collabora.com>
> Signed-off-by: Thomas Guillem <thomas at gllm.fr>
>
>> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=867f005c508b1d38058d38761e5ca9273bed3a82
> ---
>
> modules/access_output/Makefile.am | 1 +
> modules/access_output/srt.c | 396 ++++++++++++++++++++++++--------------
> 2 files changed, 249 insertions(+), 148 deletions(-)
>
> diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
> index bc954ee03b..26560f400c 100644
> --- a/modules/access_output/Makefile.am
> +++ b/modules/access_output/Makefile.am
> @@ -29,6 +29,7 @@ EXTRA_LTLIBRARIES += libaccess_output_shout_plugin.la
>
> ### SRT ###
> libaccess_output_srt_plugin_la_SOURCES = access_output/srt.c
> +libaccess_output_srt_plugin_la_CFLAGS = $(AM_CFLAGS) $(SRT_CFLAGS)
> libaccess_output_srt_plugin_la_CPPFLAGS = $(AM_CPPFLAGS) $(SRT_CPPFLAGS)
> libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS) $(LIBPTHREAD)
> libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
> diff --git a/modules/access_output/srt.c b/modules/access_output/srt.c
> index 904d8f56d0..f76bf52302 100644
> --- a/modules/access_output/srt.c
> +++ b/modules/access_output/srt.c
> @@ -1,9 +1,11 @@
> /*****************************************************************************
> * srt.c: SRT (Secure Reliable Transport) output module
> *****************************************************************************
> - * Copyright (C) 2017, Collabora Ltd.
> + * Copyright (C) 2017-2018, Collabora Ltd.
> + * Copyright (C) 2018, Haivision Systems Inc.
> *
> * Authors: Justin Kim <justin.kim at collabora.com>
> + * Roman Diouskine <rdiouskine at haivision.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU Lesser General Public License as published by
> @@ -24,9 +26,6 @@
> # include "config.h"
> #endif
>
> -#include <errno.h>
> -#include <fcntl.h>
> -
> #include <vlc_common.h>
> #include <vlc_interrupt.h>
> #include <vlc_fs.h>
> @@ -62,27 +61,158 @@ struct sout_access_out_sys_t
> {
> SRTSOCKET sock;
> int i_poll_id;
> - int i_poll_timeout;
> - int i_latency;
> - size_t i_chunk_size;
> - int i_pipe_fds[2];
> + bool b_interrupted;
> + vlc_mutex_t lock;
> };
>
> static void srt_wait_interrupted(void *p_data)
> {
> sout_access_out_t *p_access = p_data;
> sout_access_out_sys_t *p_sys = p_access->p_sys;
> - msg_Dbg( p_access, "Waking up srt_epoll_wait");
> - if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
> +
> + vlc_mutex_lock( &p_sys->lock );
> + if ( p_sys->i_poll_id >= 0 && p_sys->sock != SRT_INVALID_SOCK )
> {
> - msg_Err( p_access, "Failed to send data to event fd");
> + p_sys->b_interrupted = true;
> +
> + msg_Dbg( p_access, "Waking up srt_epoll_wait");
> +
> + /* Removing all socket descriptors from the monitoring list
> + * wakes up SRT's threads. We only have one to remove. */
> + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> }
> + vlc_mutex_unlock( &p_sys->lock );
> +}
> +
> +static bool srt_schedule_reconnect(sout_access_out_t *p_access)
> +{
> + char *psz_dst_addr = NULL;
> + int i_dst_port;
> + int i_latency;
> + int stat;
> + char *psz_passphrase = NULL;
> +
> + struct addrinfo hints = {
> + .ai_socktype = SOCK_DGRAM,
> + }, *res = NULL;
> +
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + bool failed = false;
> +
> + psz_passphrase = var_InheritString( p_access, "passphrase" );
> +
> + i_dst_port = SRT_DEFAULT_PORT;
> + char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
> + if( !psz_dst_addr )
> + {
> + failed = true;
> + goto out;
> + }
> +
> + if ( psz_parser[0] == '[' )
> + psz_parser = strchr( psz_parser, ']' );
> +
> + psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
> + if ( psz_parser != NULL )
> + {
> + *psz_parser++ = '\0';
> + i_dst_port = atoi( psz_parser );
> + }
> +
> + stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
> + if ( stat )
> + {
> + msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
> + psz_dst_addr,
> + i_dst_port,
> + gai_strerror( stat ) );
> +
> + failed = true;
> + goto out;
> + }
> +
> + /* Always start with a fresh socket */
> + if ( p_sys->sock != SRT_INVALID_SOCK )
> + {
> + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> + srt_close( p_sys->sock );
> + }
> +
> + p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
> + if ( p_sys->sock == SRT_INVALID_SOCK )
> + {
> + msg_Err( p_access, "Failed to open socket." );
> + failed = true;
> + goto out;
> + }
> +
> + /* Make SRT non-blocking */
> + srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
> + &(bool) { false }, sizeof( bool ) );
> + srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
> + &(bool) { false }, sizeof( bool ) );
> +
> + /* Make sure TSBPD mode is enable (SRT mode) */
> + srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
> + &(int) { 1 }, sizeof( int ) );
> +
> + /* This is an access_out so it is always a sender */
> + srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
> + &(int) { 1 }, sizeof( int ) );
> +
> + /* Set latency */
> + i_latency = var_InheritInteger( p_access, "latency" );
> + srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
> + &i_latency, sizeof( int ) );
> +
> + if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
> + {
> + int i_key_length = var_InheritInteger( p_access, "key-length" );
> + srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
> + psz_passphrase, strlen( psz_passphrase ) );
> + srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
> + &i_key_length, sizeof( int ) );
> + }
> +
> + srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
> +
> + srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> + &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
> +
> + /* Schedule a connect */
> + msg_Dbg( p_access, "Schedule SRT connect (dest addresss: %s, port: %d).",
> + psz_dst_addr, i_dst_port );
> +
> + stat = srt_connect( p_sys->sock, res->ai_addr, res->ai_addrlen );
> + if ( stat == SRT_ERROR )
> + {
> + msg_Err( p_access, "Failed to connect to server (reason: %s)",
> + srt_getlasterror_str() );
> + failed = true;
> + }
> +
> +out:
> + if (failed && p_sys->sock != SRT_INVALID_SOCK)
> + {
> + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> + srt_close(p_sys->sock);
> + p_sys->sock = SRT_INVALID_SOCK;
> + }
> +
> + free( psz_passphrase );
> + free( psz_dst_addr );
> + freeaddrinfo( res );
> +
> + return !failed;
> }
>
> static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
> {
> sout_access_out_sys_t *p_sys = p_access->p_sys;
> int i_len = 0;
> + size_t i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
> + int i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
> + bool b_interrupted = false;
>
> vlc_interrupt_register( srt_wait_interrupted, p_access);
>
> @@ -94,56 +224,116 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
>
> while( p_buffer->i_buffer )
> {
> - size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_chunk_size );
> - SRTSOCKET ready[2];
> + if ( vlc_killed() )
> + {
> + /* We are told to stop. Stop. */
> + i_len = VLC_EGENERIC;
> + goto out;
> + }
> +
> + switch( srt_getsockstate( p_sys->sock ) )
> + {
> + case SRTS_CONNECTED:
> + /* Good to go */
> + break;
> + case SRTS_BROKEN:
> + case SRTS_NONEXIST:
> + case SRTS_CLOSED:
> + /* Failed. Schedule recovery. */
> + if ( !srt_schedule_reconnect( p_access ) )
> + msg_Err( p_access, "Failed to schedule connect");
> + /* Fall-through */
> + default:
> + /* Not ready */
> + i_len = VLC_EGENERIC;
> + goto out;
> + }
>
> -retry:
> + SRTSOCKET ready[1];
> + int readycnt = 1;
> if ( srt_epoll_wait( p_sys->i_poll_id,
> - 0, 0, ready, &(int){ 2 }, p_sys->i_poll_timeout,
> - &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
> + 0, 0, &ready[0], &readycnt,
> + i_poll_timeout, NULL, 0, NULL, 0 ) < 0)
> {
> - /* Assuming that timeout error is normal when SRT socket is connected. */
> - if ( srt_getlasterror( NULL ) == SRT_ETIMEOUT &&
> - srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
> + if ( vlc_killed() )
> {
> - srt_clearlasterror();
> - goto retry;
> + /* We are told to stop. Stop. */
> + i_len = VLC_EGENERIC;
> + goto out;
> }
>
> - i_len = VLC_EGENERIC;
> - goto out;
> - }
> + /* if 'srt_epoll_wait' is interrupted, we still need to
> + * finish sending current block or it may be sent only
> + * partially. TODO: this delay can be prevented,
> + * possibly with a FIFO and an additional thread.
> + */
> + vlc_mutex_lock( &p_sys->lock );
> + if ( p_sys->b_interrupted )
> + {
> + srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> + &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
> + p_sys->b_interrupted = false;
> + b_interrupted = true;
> + }
> + vlc_mutex_unlock( &p_sys->lock );
>
> - bool cancel = 0;
> - int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
> - if ( ret > 0 && cancel )
> - {
> - msg_Dbg( p_access, "Cancelled running" );
> - i_len = 0;
> - goto out;
> + if ( !b_interrupted )
> + {
> + continue;
> + }
> + else if ( (true) )
> + {
> + msg_Dbg( p_access, "srt_epoll_wait was interrupted");
> + }
> }
>
> - if ( srt_sendmsg2( p_sys->sock, (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
> - msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
> + if ( readycnt > 0 && ready[0] == p_sys->sock
> + && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED)
> + {
> + size_t i_write = __MIN( p_buffer->i_buffer, i_chunk_size );
> + if (srt_sendmsg2( p_sys->sock,
> + (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
> + {
> + msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
> + i_len = VLC_EGENERIC;
> + goto out;
> + }
>
> - p_buffer->p_buffer += i_write;
> - p_buffer->i_buffer -= i_write;
> + p_buffer->p_buffer += i_write;
> + p_buffer->i_buffer -= i_write;
> + }
> }
>
> p_next = p_buffer->p_next;
> block_Release( p_buffer );
> p_buffer = p_next;
> +
> + if ( b_interrupted )
> + {
> + goto out;
> + }
> }
>
> out:
> vlc_interrupt_unregister();
> +
> + /* Re-add the socket to the poll if we were interrupted */
> + vlc_mutex_lock( &p_sys->lock );
> + if ( p_sys->b_interrupted )
> + {
> + srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
> + &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT } );
> + p_sys->b_interrupted = false;
> + }
> + vlc_mutex_unlock( &p_sys->lock );
> +
> if ( i_len <= 0 ) block_ChainRelease( p_buffer );
> return i_len;
> }
>
> static int Control( sout_access_out_t *p_access, int i_query, va_list args )
> {
> - VLC_UNUSED (p_access);
> + VLC_UNUSED( p_access );
>
> int i_ret = VLC_SUCCESS;
>
> @@ -166,15 +356,6 @@ static int Open( vlc_object_t *p_this )
> sout_access_out_t *p_access = (sout_access_out_t*)p_this;
> sout_access_out_sys_t *p_sys = NULL;
>
> - char *psz_dst_addr = NULL;
> - int i_dst_port;
> - int stat;
> - char *psz_passphrase = NULL;
> -
> - struct addrinfo hints = {
> - .ai_socktype = SOCK_DGRAM,
> - }, *res = NULL;
> -
> if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
> || var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
> || var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
> @@ -184,77 +365,15 @@ static int Open( vlc_object_t *p_this )
> return VLC_ENOMEM;
> }
>
> - if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
> + p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
> + if( unlikely( p_sys == NULL ) )
> return VLC_ENOMEM;
>
> - p_sys->i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
> - p_sys->i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
> - p_sys->i_latency = var_InheritInteger( p_access, "latency" );
> - p_sys->i_poll_id = -1;
> + srt_startup();
>
> - p_access->p_sys = p_sys;
> -
> - psz_passphrase = var_InheritString( p_access, "passphrase" );
> -
> - i_dst_port = SRT_DEFAULT_PORT;
> - char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
> - if( !psz_dst_addr )
> - {
> - free( p_sys );
> - return VLC_ENOMEM;
> - }
> -
> - if (psz_parser[0] == '[')
> - psz_parser = strchr (psz_parser, ']');
> -
> - psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
> - if (psz_parser != NULL)
> - {
> - *psz_parser++ = '\0';
> - i_dst_port = atoi (psz_parser);
> - }
> -
> - msg_Dbg( p_access, "Setting SRT socket (dest addresss: %s, port: %d).",
> - psz_dst_addr, i_dst_port );
> -
> - stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
> - if ( stat )
> - {
> - msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
> - psz_dst_addr,
> - i_dst_port,
> - gai_strerror( stat ) );
> + vlc_mutex_init( &p_sys->lock );
>
> - goto failed;
> - }
> -
> - p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
> - if ( p_sys->sock == SRT_ERROR )
> - {
> - msg_Err( p_access, "Failed to open socket." );
> - goto failed;
> - }
> -
> - /* Make SRT non-blocking */
> - srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
> -
> - /* Make sure TSBPD mode is enable (SRT mode) */
> - srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
> -
> - /* This is an access_out so it is always a sender */
> - srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof( int ) );
> -
> - /* Set latency */
> - srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
> -
> - if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
> - {
> - int i_key_length = var_InheritInteger( p_access, "key-length" );
> - srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
> - psz_passphrase, strlen( psz_passphrase ) );
> - srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
> - &i_key_length, sizeof( int ) );
> - }
> + p_access->p_sys = p_sys;
>
> p_sys->i_poll_id = srt_epoll_create();
> if ( p_sys->i_poll_id == -1 )
> @@ -265,53 +384,28 @@ static int Open( vlc_object_t *p_this )
> goto failed;
> }
>
> - if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
> + if ( !srt_schedule_reconnect( p_access ) )
> {
> - msg_Err( p_access, "Failed to create pipe fds." );
> - goto failed;
> - }
> - fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
> -
> - srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_OUT });
> - srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
> + msg_Err( p_access, "Failed to schedule connect");
>
> - srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
> -
> - stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
> - if ( stat == SRT_ERROR )
> - {
> - msg_Err( p_access, "Failed to connect to server (reason: %s)",
> - srt_getlasterror_str() );
> goto failed;
> }
>
> p_access->pf_write = Write;
> p_access->pf_control = Control;
>
> - free( psz_passphrase );
> - free( psz_dst_addr );
> - freeaddrinfo( res );
> -
> return VLC_SUCCESS;
>
> failed:
> - free( psz_passphrase );
> -
> - if ( psz_dst_addr != NULL)
> - free( psz_dst_addr );
> -
> - if ( res != NULL )
> - freeaddrinfo( res );
> -
> - vlc_close( p_sys->i_pipe_fds[0] );
> - vlc_close( p_sys->i_pipe_fds[1] );
> + vlc_mutex_destroy( &p_sys->lock );
>
> if ( p_sys != NULL )
> {
> - if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
> if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
> + if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
>
> - free( p_sys );
> + vlc_obj_free( p_this, p_sys );
> + p_access->p_sys = NULL;
> }
>
> return VLC_EGENERIC;
> @@ -322,13 +416,19 @@ static void Close( vlc_object_t * p_this )
> sout_access_out_t *p_access = (sout_access_out_t*)p_this;
> sout_access_out_sys_t *p_sys = p_access->p_sys;
>
> - srt_epoll_release( p_sys->i_poll_id );
> - srt_close( p_sys->sock );
> + if ( p_sys )
> + {
> + vlc_mutex_destroy( &p_sys->lock );
> +
> + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
> + srt_close( p_sys->sock );
> + srt_epoll_release( p_sys->i_poll_id );
>
> - vlc_close( p_sys->i_pipe_fds[0] );
> - vlc_close( p_sys->i_pipe_fds[1] );
> + vlc_obj_free( p_this, p_sys );
> + p_access->p_sys = NULL;
> + }
>
> - free( p_sys );
> + srt_cleanup();
> }
>
> /* Module descriptor */
>
> _______________________________________________
> vlc-commits mailing list
> vlc-commits at videolan.org
> https://mailman.videolan.org/listinfo/vlc-commits
More information about the vlc-devel
mailing list