[vlc-devel] [PATCH v5 1/3] access_out: srt: Refactoring to support connection recovery
Justin Kim
justin.kim at collabora.com
Wed Mar 28 12:29:44 CEST 2018
This is a refactoring code to support SRT connection recovery.
While doing this work, SRT modules support new features comparing
to the previous ones.
- Connection Recovery
When lost a SRT connection, this module can detect and try to re-connect.
- Interruptible by SRT APIs
'srt_epoll_wait' will be interrupted when all socket descriptors
are removed from its montoring list. Fortunately, SRT modules are using
only one socket.
- Platform Independent
Now, SRT modules no longer require to use a pipe. Therfore, from this version,
SRT modules can support Win32 platforms.
Based on code from:
Roman Diouskine <rdiouskine at haivision.com>
Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
modules/access_output/Makefile.am | 1 +
modules/access_output/srt.c | 396 ++++++++++++++++++++++++--------------
2 files changed, 249 insertions(+), 148 deletions(-)
diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
index bc954ee03b..26560f400c 100644
--- a/modules/access_output/Makefile.am
+++ b/modules/access_output/Makefile.am
@@ -29,6 +29,7 @@ EXTRA_LTLIBRARIES += libaccess_output_shout_plugin.la
### SRT ###
libaccess_output_srt_plugin_la_SOURCES = access_output/srt.c
+libaccess_output_srt_plugin_la_CFLAGS = $(AM_CFLAGS) $(SRT_CFLAGS)
libaccess_output_srt_plugin_la_CPPFLAGS = $(AM_CPPFLAGS) $(SRT_CPPFLAGS)
libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS) $(LIBPTHREAD)
libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
diff --git a/modules/access_output/srt.c b/modules/access_output/srt.c
index 904d8f56d0..f76bf52302 100644
--- a/modules/access_output/srt.c
+++ b/modules/access_output/srt.c
@@ -1,9 +1,11 @@
/*****************************************************************************
* srt.c: SRT (Secure Reliable Transport) output module
*****************************************************************************
- * Copyright (C) 2017, Collabora Ltd.
+ * Copyright (C) 2017-2018, Collabora Ltd.
+ * Copyright (C) 2018, Haivision Systems Inc.
*
* Authors: Justin Kim <justin.kim at collabora.com>
+ * Roman Diouskine <rdiouskine at haivision.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
@@ -24,9 +26,6 @@
# include "config.h"
#endif
-#include <errno.h>
-#include <fcntl.h>
-
#include <vlc_common.h>
#include <vlc_interrupt.h>
#include <vlc_fs.h>
@@ -62,27 +61,158 @@ struct sout_access_out_sys_t
{
SRTSOCKET sock;
int i_poll_id;
- int i_poll_timeout;
- int i_latency;
- size_t i_chunk_size;
- int i_pipe_fds[2];
+ bool b_interrupted;
+ vlc_mutex_t lock;
};
static void srt_wait_interrupted(void *p_data)
{
sout_access_out_t *p_access = p_data;
sout_access_out_sys_t *p_sys = p_access->p_sys;
- msg_Dbg( p_access, "Waking up srt_epoll_wait");
- if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+
+ vlc_mutex_lock( &p_sys->lock );
+ if ( p_sys->i_poll_id >= 0 && p_sys->sock != SRT_INVALID_SOCK )
{
- msg_Err( p_access, "Failed to send data to event fd");
+ p_sys->b_interrupted = true;
+
+ msg_Dbg( p_access, "Waking up srt_epoll_wait");
+
+ /* Removing all socket descriptors from the monitoring list
+ * wakes up SRT's threads. We only have one to remove. */
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
}
+ vlc_mutex_unlock( &p_sys->lock );
+}
+
+static bool srt_schedule_reconnect(sout_access_out_t *p_access)
+{
+ char *psz_dst_addr = NULL;
+ int i_dst_port;
+ int i_latency;
+ int stat;
+ char *psz_passphrase = NULL;
+
+ struct addrinfo hints = {
+ .ai_socktype = SOCK_DGRAM,
+ }, *res = NULL;
+
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ bool failed = false;
+
+ psz_passphrase = var_InheritString( p_access, "passphrase" );
+
+ i_dst_port = SRT_DEFAULT_PORT;
+ char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
+ if( !psz_dst_addr )
+ {
+ failed = true;
+ goto out;
+ }
+
+ if ( psz_parser[0] == '[' )
+ psz_parser = strchr( psz_parser, ']' );
+
+ psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
+ if ( psz_parser != NULL )
+ {
+ *psz_parser++ = '\0';
+ i_dst_port = atoi( psz_parser );
+ }
+
+ stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
+ if ( stat )
+ {
+ msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
+ psz_dst_addr,
+ i_dst_port,
+ gai_strerror( stat ) );
+
+ failed = true;
+ goto out;
+ }
+
+ /* Always start with a fresh socket */
+ if ( p_sys->sock != SRT_INVALID_SOCK )
+ {
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close( p_sys->sock );
+ }
+
+ p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
+ if ( p_sys->sock == SRT_INVALID_SOCK )
+ {
+ msg_Err( p_access, "Failed to open socket." );
+ failed = true;
+ goto out;
+ }
+
+ /* Make SRT non-blocking */
+ srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
+ &(bool) { false }, sizeof( bool ) );
+ srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
+ &(bool) { false }, sizeof( bool ) );
+
+ /* Make sure TSBPD mode is enable (SRT mode) */
+ srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
+ &(int) { 1 }, sizeof( int ) );
+
+ /* This is an access_out so it is always a sender */
+ srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
+ &(int) { 1 }, sizeof( int ) );
+
+ /* Set latency */
+ i_latency = var_InheritInteger( p_access, "latency" );
+ srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
+ &i_latency, sizeof( int ) );
+
+ if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
+ {
+ int i_key_length = var_InheritInteger( p_access, "key-length" );
+ srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
+ psz_passphrase, strlen( psz_passphrase ) );
+ srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
+ &i_key_length, sizeof( int ) );
+ }
+
+ srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
+
+ srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+ &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
+
+ /* Schedule a connect */
+ msg_Dbg( p_access, "Schedule SRT connect (dest addresss: %s, port: %d).",
+ psz_dst_addr, i_dst_port );
+
+ stat = srt_connect( p_sys->sock, res->ai_addr, res->ai_addrlen );
+ if ( stat == SRT_ERROR )
+ {
+ msg_Err( p_access, "Failed to connect to server (reason: %s)",
+ srt_getlasterror_str() );
+ failed = true;
+ }
+
+out:
+ if (failed && p_sys->sock != SRT_INVALID_SOCK)
+ {
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close(p_sys->sock);
+ p_sys->sock = SRT_INVALID_SOCK;
+ }
+
+ free( psz_passphrase );
+ free( psz_dst_addr );
+ freeaddrinfo( res );
+
+ return !failed;
}
static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
{
sout_access_out_sys_t *p_sys = p_access->p_sys;
int i_len = 0;
+ size_t i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
+ int i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
+ bool b_interrupted = false;
vlc_interrupt_register( srt_wait_interrupted, p_access);
@@ -94,56 +224,116 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
while( p_buffer->i_buffer )
{
- size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_chunk_size );
- SRTSOCKET ready[2];
+ if ( vlc_killed() )
+ {
+ /* We are told to stop. Stop. */
+ i_len = VLC_EGENERIC;
+ goto out;
+ }
+
+ switch( srt_getsockstate( p_sys->sock ) )
+ {
+ case SRTS_CONNECTED:
+ /* Good to go */
+ break;
+ case SRTS_BROKEN:
+ case SRTS_NONEXIST:
+ case SRTS_CLOSED:
+ /* Failed. Schedule recovery. */
+ if ( !srt_schedule_reconnect( p_access ) )
+ msg_Err( p_access, "Failed to schedule connect");
+ /* Fall-through */
+ default:
+ /* Not ready */
+ i_len = VLC_EGENERIC;
+ goto out;
+ }
-retry:
+ SRTSOCKET ready[1];
+ int readycnt = 1;
if ( srt_epoll_wait( p_sys->i_poll_id,
- 0, 0, ready, &(int){ 2 }, p_sys->i_poll_timeout,
- &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
+ 0, 0, &ready[0], &readycnt,
+ i_poll_timeout, NULL, 0, NULL, 0 ) < 0)
{
- /* Assuming that timeout error is normal when SRT socket is connected. */
- if ( srt_getlasterror( NULL ) == SRT_ETIMEOUT &&
- srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
+ if ( vlc_killed() )
{
- srt_clearlasterror();
- goto retry;
+ /* We are told to stop. Stop. */
+ i_len = VLC_EGENERIC;
+ goto out;
}
- i_len = VLC_EGENERIC;
- goto out;
- }
+ /* if 'srt_epoll_wait' is interrupted, we still need to
+ * finish sending current block or it may be sent only
+ * partially. TODO: this delay can be prevented,
+ * possibly with a FIFO and an additional thread.
+ */
+ vlc_mutex_lock( &p_sys->lock );
+ if ( p_sys->b_interrupted )
+ {
+ srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+ &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT });
+ p_sys->b_interrupted = false;
+ b_interrupted = true;
+ }
+ vlc_mutex_unlock( &p_sys->lock );
- bool cancel = 0;
- int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
- if ( ret > 0 && cancel )
- {
- msg_Dbg( p_access, "Cancelled running" );
- i_len = 0;
- goto out;
+ if ( !b_interrupted )
+ {
+ continue;
+ }
+ else if ( (true) )
+ {
+ msg_Dbg( p_access, "srt_epoll_wait was interrupted");
+ }
}
- if ( srt_sendmsg2( p_sys->sock, (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
- msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+ if ( readycnt > 0 && ready[0] == p_sys->sock
+ && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED)
+ {
+ size_t i_write = __MIN( p_buffer->i_buffer, i_chunk_size );
+ if (srt_sendmsg2( p_sys->sock,
+ (char *)p_buffer->p_buffer, i_write, 0 ) == SRT_ERROR )
+ {
+ msg_Warn( p_access, "send error: %s", srt_getlasterror_str() );
+ i_len = VLC_EGENERIC;
+ goto out;
+ }
- p_buffer->p_buffer += i_write;
- p_buffer->i_buffer -= i_write;
+ p_buffer->p_buffer += i_write;
+ p_buffer->i_buffer -= i_write;
+ }
}
p_next = p_buffer->p_next;
block_Release( p_buffer );
p_buffer = p_next;
+
+ if ( b_interrupted )
+ {
+ goto out;
+ }
}
out:
vlc_interrupt_unregister();
+
+ /* Re-add the socket to the poll if we were interrupted */
+ vlc_mutex_lock( &p_sys->lock );
+ if ( p_sys->b_interrupted )
+ {
+ srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+ &(int) { SRT_EPOLL_ERR | SRT_EPOLL_OUT } );
+ p_sys->b_interrupted = false;
+ }
+ vlc_mutex_unlock( &p_sys->lock );
+
if ( i_len <= 0 ) block_ChainRelease( p_buffer );
return i_len;
}
static int Control( sout_access_out_t *p_access, int i_query, va_list args )
{
- VLC_UNUSED (p_access);
+ VLC_UNUSED( p_access );
int i_ret = VLC_SUCCESS;
@@ -166,15 +356,6 @@ static int Open( vlc_object_t *p_this )
sout_access_out_t *p_access = (sout_access_out_t*)p_this;
sout_access_out_sys_t *p_sys = NULL;
- char *psz_dst_addr = NULL;
- int i_dst_port;
- int stat;
- char *psz_passphrase = NULL;
-
- struct addrinfo hints = {
- .ai_socktype = SOCK_DGRAM,
- }, *res = NULL;
-
if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
|| var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
|| var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
@@ -184,77 +365,15 @@ static int Open( vlc_object_t *p_this )
return VLC_ENOMEM;
}
- if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
+ p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+ if( unlikely( p_sys == NULL ) )
return VLC_ENOMEM;
- p_sys->i_chunk_size = var_InheritInteger( p_access, "chunk-size" );
- p_sys->i_poll_timeout = var_InheritInteger( p_access, "poll-timeout" );
- p_sys->i_latency = var_InheritInteger( p_access, "latency" );
- p_sys->i_poll_id = -1;
+ srt_startup();
- p_access->p_sys = p_sys;
-
- psz_passphrase = var_InheritString( p_access, "passphrase" );
-
- i_dst_port = SRT_DEFAULT_PORT;
- char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
- if( !psz_dst_addr )
- {
- free( p_sys );
- return VLC_ENOMEM;
- }
-
- if (psz_parser[0] == '[')
- psz_parser = strchr (psz_parser, ']');
-
- psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
- if (psz_parser != NULL)
- {
- *psz_parser++ = '\0';
- i_dst_port = atoi (psz_parser);
- }
-
- msg_Dbg( p_access, "Setting SRT socket (dest addresss: %s, port: %d).",
- psz_dst_addr, i_dst_port );
-
- stat = vlc_getaddrinfo( psz_dst_addr, i_dst_port, &hints, &res );
- if ( stat )
- {
- msg_Err( p_access, "Cannot resolve [%s]:%d (reason: %s)",
- psz_dst_addr,
- i_dst_port,
- gai_strerror( stat ) );
+ vlc_mutex_init( &p_sys->lock );
- goto failed;
- }
-
- p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
- if ( p_sys->sock == SRT_ERROR )
- {
- msg_Err( p_access, "Failed to open socket." );
- goto failed;
- }
-
- /* Make SRT non-blocking */
- srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
-
- /* Make sure TSBPD mode is enable (SRT mode) */
- srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
-
- /* This is an access_out so it is always a sender */
- srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof( int ) );
-
- /* Set latency */
- srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
-
- if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
- {
- int i_key_length = var_InheritInteger( p_access, "key-length" );
- srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
- psz_passphrase, strlen( psz_passphrase ) );
- srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
- &i_key_length, sizeof( int ) );
- }
+ p_access->p_sys = p_sys;
p_sys->i_poll_id = srt_epoll_create();
if ( p_sys->i_poll_id == -1 )
@@ -265,53 +384,28 @@ static int Open( vlc_object_t *p_this )
goto failed;
}
- if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
+ if ( !srt_schedule_reconnect( p_access ) )
{
- msg_Err( p_access, "Failed to create pipe fds." );
- goto failed;
- }
- fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
-
- srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_OUT });
- srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, &(int) { 1 }, sizeof(int) );
+ msg_Err( p_access, "Failed to schedule connect");
- srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
-
- stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
- if ( stat == SRT_ERROR )
- {
- msg_Err( p_access, "Failed to connect to server (reason: %s)",
- srt_getlasterror_str() );
goto failed;
}
p_access->pf_write = Write;
p_access->pf_control = Control;
- free( psz_passphrase );
- free( psz_dst_addr );
- freeaddrinfo( res );
-
return VLC_SUCCESS;
failed:
- free( psz_passphrase );
-
- if ( psz_dst_addr != NULL)
- free( psz_dst_addr );
-
- if ( res != NULL )
- freeaddrinfo( res );
-
- vlc_close( p_sys->i_pipe_fds[0] );
- vlc_close( p_sys->i_pipe_fds[1] );
+ vlc_mutex_destroy( &p_sys->lock );
if ( p_sys != NULL )
{
- if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
+ if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
- free( p_sys );
+ vlc_obj_free( p_this, p_sys );
+ p_access->p_sys = NULL;
}
return VLC_EGENERIC;
@@ -322,13 +416,19 @@ static void Close( vlc_object_t * p_this )
sout_access_out_t *p_access = (sout_access_out_t*)p_this;
sout_access_out_sys_t *p_sys = p_access->p_sys;
- srt_epoll_release( p_sys->i_poll_id );
- srt_close( p_sys->sock );
+ if ( p_sys )
+ {
+ vlc_mutex_destroy( &p_sys->lock );
+
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close( p_sys->sock );
+ srt_epoll_release( p_sys->i_poll_id );
- vlc_close( p_sys->i_pipe_fds[0] );
- vlc_close( p_sys->i_pipe_fds[1] );
+ vlc_obj_free( p_this, p_sys );
+ p_access->p_sys = NULL;
+ }
- free( p_sys );
+ srt_cleanup();
}
/* Module descriptor */
--
2.16.3
More information about the vlc-devel
mailing list