[vlc-devel] [PATCH 2/3] access: srt: Refactoring to support connection recovery
Justin Kim
justin.kim at collabora.com
Tue Jan 30 09:13:33 CET 2018
This is a refactoring code to support SRT connection recovery.
While doing this work, SRT modules support new features comparing
to the previous ones.
- Connection Recovery
When lost a SRT connection, this module can detect and try to re-connect.
- Interruptible by SRT APIs
'srt_epoll_wait' will be interrupted when all socket descriptors
are removed from its montoring list. Fortunately, SRT modules are using
only one socket.
- Platform Independent
Now, SRT modules no longer require to use a pipe. Therfore, from this version,
SRT modules can support Win32 platforms.
Based on code from:
Roman Diouskine <rdiouskine at haivision.com>
Signed-off-by: Justin Kim <justin.kim at collabora.com>
---
modules/access/srt.c | 338 ++++++++++++++++++++++++++++++---------------------
1 file changed, 201 insertions(+), 137 deletions(-)
diff --git a/modules/access/srt.c b/modules/access/srt.c
index bf770a42a5..410d15f749 100644
--- a/modules/access/srt.c
+++ b/modules/access/srt.c
@@ -1,9 +1,11 @@
/*****************************************************************************
* srt.c: SRT (Secure Reliable Transport) input module
*****************************************************************************
- * Copyright (C) 2017, Collabora Ltd.
+ * Copyright (C) 2017-2018, Collabora Ltd.
+ * Copyright (C) 2018, Haivision Systems Inc.
*
* Authors: Justin Kim <justin.kim at collabora.com>
+ * Roman Diouskine <rdiouskine at haivision.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
@@ -24,9 +26,6 @@
# include "config.h"
#endif
-#include <errno.h>
-#include <fcntl.h>
-
#include <vlc_common.h>
#include <vlc_interrupt.h>
#include <vlc_fs.h>
@@ -61,21 +60,27 @@ struct stream_sys_t
{
SRTSOCKET sock;
int i_poll_id;
- int i_poll_timeout;
- int i_latency;
- size_t i_chunk_size;
- int i_pipe_fds[2];
+ vlc_mutex_t lock;
+ bool b_interrupted;
};
static void srt_wait_interrupted(void *p_data)
{
stream_t *p_stream = p_data;
stream_sys_t *p_sys = p_stream->p_sys;
- msg_Dbg( p_stream, "Waking up srt_epoll_wait");
- if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 )
+
+ vlc_mutex_lock( &p_sys->lock );
+ if ( p_sys->i_poll_id >= 0 && p_sys->sock != SRT_INVALID_SOCK )
{
- msg_Err( p_stream, "Failed to send data to pipe");
+ p_sys->b_interrupted = true;
+
+ msg_Dbg( p_stream, "Waking up srt_epoll_wait");
+
+ /* Removing all socket descriptors from the monitoring list
+ * wakes up SRT's threads. We only have one to remove. */
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
}
+ vlc_mutex_unlock( &p_sys->lock );
}
static int Control(stream_t *p_stream, int i_query, va_list args)
@@ -102,97 +107,28 @@ static int Control(stream_t *p_stream, int i_query, va_list args)
return i_ret;
}
-static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof)
+static bool srt_schedule_reconnect(stream_t *p_stream)
{
- stream_sys_t *p_sys = p_stream->p_sys;
-
- block_t *pkt = block_Alloc( p_sys->i_chunk_size );
-
- if ( unlikely( pkt == NULL ) )
- {
- return NULL;
- }
-
- vlc_interrupt_register( srt_wait_interrupted, p_stream);
-
- SRTSOCKET ready[2];
-
- if ( srt_epoll_wait( p_sys->i_poll_id,
- ready, &(int) { 2 }, NULL, 0, p_sys->i_poll_timeout,
- &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 )
- {
- int srt_err = srt_getlasterror( NULL );
-
- /* Assuming that timeout error is normal when SRT socket is connected. */
- if ( srt_err == SRT_ETIMEOUT && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED )
- {
- goto skip;
- }
-
- msg_Err( p_stream, "released poll wait (reason : %s)", srt_getlasterror_str() );
- goto endofstream;
- }
-
- bool cancel = 0;
- int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) );
- if ( ret > 0 && cancel )
- {
- msg_Dbg( p_stream, "Cancelled running" );
- goto endofstream;
- }
-
- int stat = srt_recvmsg( p_sys->sock, (char *)pkt->p_buffer, p_sys->i_chunk_size );
-
- if ( stat == SRT_ERROR )
- {
- msg_Err( p_stream, "failed to recevie SRT packet (reason: %s)", srt_getlasterror_str() );
- goto endofstream;
- }
-
- pkt->i_buffer = stat;
- vlc_interrupt_unregister();
- return pkt;
-
-endofstream:
- msg_Dbg( p_stream, "EOS");
- *eof = true;
-skip:
- block_Release(pkt);
- srt_clearlasterror();
- vlc_interrupt_unregister();
-
- return NULL;
-}
+ int i_latency;
+ int stat;
+ char *psz_passphrase = NULL;
+ vlc_url_t parsed_url = { 0 };
-static int Open(vlc_object_t *p_this)
-{
- stream_t *p_stream = (stream_t*)p_this;
- stream_sys_t *p_sys = NULL;
- vlc_url_t parsed_url = { 0 };
struct addrinfo hints = {
.ai_socktype = SOCK_DGRAM,
}, *res = NULL;
- int stat;
- char *psz_passphrase = NULL;
-
- p_sys = vlc_obj_malloc( p_this, sizeof( *p_sys ) );
- if( unlikely( p_sys == NULL ) )
- return VLC_ENOMEM;
+ stream_sys_t *p_sys = p_stream->p_sys;
+ bool failed = false;
if ( vlc_UrlParse( &parsed_url, p_stream->psz_url ) == -1 )
{
- msg_Err( p_stream, "Failed to parse a given URL (%s)", p_stream->psz_url );
- goto failed;
- }
+ msg_Err( p_stream, "Failed to parse input URL (%s)",
+ p_stream->psz_url );
- p_sys->i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
- p_sys->i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" );
- p_sys->i_latency = var_InheritInteger( p_stream, "latency" );
- p_sys->i_poll_id = -1;
- p_stream->p_sys = p_sys;
- p_stream->pf_block = BlockSRT;
- p_stream->pf_control = Control;
+ failed = true;
+ goto out;
+ }
psz_passphrase = var_InheritString( p_stream, "passphrase" );
@@ -204,91 +140,216 @@ static int Open(vlc_object_t *p_this)
parsed_url.i_port,
gai_strerror( stat ) );
- goto failed;
+ failed = true;
+ goto out;
+ }
+
+ /* Always start with a fresh socket */
+ if (p_sys->sock != SRT_INVALID_SOCK)
+ {
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close(p_sys->sock);
}
p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 );
- if ( p_sys->sock == SRT_ERROR )
+ if ( p_sys->sock == SRT_INVALID_SOCK )
{
msg_Err( p_stream, "Failed to open socket." );
- goto failed;
+ failed = true;
+ goto out;
}
/* Make SRT non-blocking */
- srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) );
+ srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN,
+ &(bool) { false }, sizeof( bool ) );
+ srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN,
+ &(bool) { false }, sizeof( bool ) );
/* Make sure TSBPD mode is enable (SRT mode) */
- srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) );
+ srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE,
+ &(int) { 1 }, sizeof( int ) );
+
+ /* This is an access module so it is always a receiver */
+ srt_setsockopt( p_sys->sock, 0, SRTO_SENDER,
+ &(int) { 0 }, sizeof( int ) );
/* Set latency */
- srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) );
+ i_latency = var_InheritInteger( p_stream, "latency" );
+ srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY,
+ &i_latency, sizeof( int ) );
if ( psz_passphrase != NULL && psz_passphrase[0] != '\0')
{
int i_key_length = var_InheritInteger( p_stream, "key-length" );
-
srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE,
psz_passphrase, strlen( psz_passphrase ) );
srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN,
&i_key_length, sizeof( int ) );
}
- p_sys->i_poll_id = srt_epoll_create();
- if ( p_sys->i_poll_id == -1 )
+ srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+ &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN });
+
+ /* Schedule a connect */
+ msg_Dbg( p_stream, "Schedule SRT connect (dest addresss: %s, port: %d).",
+ parsed_url.psz_host, parsed_url.i_port);
+
+ stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr));
+ if ( stat == SRT_ERROR )
{
- msg_Err( p_stream, "Failed to create poll id for SRT socket." );
- goto failed;
+ msg_Err( p_stream, "Failed to connect to server (reason: %s)",
+ srt_getlasterror_str() );
+ failed = true;
}
- if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 )
+out:
+ if (failed && p_sys->sock != SRT_INVALID_SOCK)
{
- msg_Err( p_stream, "Failed to create pipe fds." );
- goto failed;
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close(p_sys->sock);
+ p_sys->sock = SRT_INVALID_SOCK;
}
- fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) );
+ vlc_UrlClean( &parsed_url );
+ freeaddrinfo( res );
+ free( psz_passphrase );
- srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_IN } );
- srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } );
+ return !failed;
+}
- stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr) );
+static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof)
+{
+ stream_sys_t *p_sys = p_stream->p_sys;
+ int i_chunk_size = var_InheritInteger( p_stream, "chunk-size" );
+ int i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" );
- if ( stat == SRT_ERROR )
+ if ( vlc_killed() )
{
- msg_Err( p_stream, "Failed to connect to server." );
- goto failed;
+ /* We are told to stop. Stop. */
+ return NULL;
}
- vlc_UrlClean( &parsed_url );
- freeaddrinfo( res );
- free (psz_passphrase);
-
- return VLC_SUCCESS;
+ block_t *pkt = block_Alloc( i_chunk_size );
+ if ( unlikely( pkt == NULL ) )
+ {
+ return NULL;
+ }
-failed:
+ vlc_interrupt_register( srt_wait_interrupted, p_stream);
- if ( parsed_url.psz_host != NULL
- && parsed_url.psz_buffer != NULL)
+ SRTSOCKET ready[1];
+ int readycnt = 1;
+ while ( srt_epoll_wait( p_sys->i_poll_id,
+ ready, &readycnt, 0, 0,
+ i_poll_timeout, NULL, 0, NULL, 0 ) >= 0)
{
- vlc_UrlClean( &parsed_url );
+ if ( readycnt < 0 || ready[0] != p_sys->sock )
+ {
+ /* should never happen, force recovery */
+ srt_close(p_sys->sock);
+ p_sys->sock = SRT_INVALID_SOCK;
+ }
+
+ switch( srt_getsockstate( p_sys->sock ) )
+ {
+ case SRTS_CONNECTED:
+ /* Good to go */
+ break;
+ case SRTS_BROKEN:
+ case SRTS_NONEXIST:
+ case SRTS_CLOSED:
+ /* Failed. Schedule recovery. */
+ if ( !srt_schedule_reconnect( p_stream ) )
+ msg_Err( p_stream, "Failed to schedule connect" );
+ /* Fall-through */
+ default:
+ /* Not ready */
+ continue;
+ }
+
+ int stat = srt_recvmsg( p_sys->sock,
+ (char *)pkt->p_buffer, i_chunk_size );
+ if ( stat > 0 )
+ {
+ pkt->i_buffer = stat;
+ goto out;
+ }
+
+ msg_Err( p_stream, "failed to receive packet, set EOS (reason: %s)",
+ srt_getlasterror_str() );
+ *eof = true;
+ break;
}
- if ( res != NULL )
+ /* if the poll reports errors for any reason at all
+ * including a timeout, or there is a read error,
+ * we skip the turn.
+ */
+
+ block_Release(pkt);
+ pkt = NULL;
+
+out:
+ vlc_interrupt_unregister();
+
+ /* Re-add the socket to the poll if we were interrupted */
+ vlc_mutex_lock( &p_sys->lock );
+ if ( p_sys->b_interrupted )
{
- freeaddrinfo( res );
+ srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock,
+ &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN } );
+ p_sys->b_interrupted = false;
}
+ vlc_mutex_unlock( &p_sys->lock );
- vlc_close( p_sys->i_pipe_fds[0] );
- vlc_close( p_sys->i_pipe_fds[1] );
+ return pkt;
+}
- if ( p_sys->i_poll_id != -1 )
+static int Open(vlc_object_t *p_this)
+{
+ stream_t *p_stream = (stream_t*)p_this;
+ stream_sys_t *p_sys = NULL;
+
+ p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+ if( unlikely( p_sys == NULL ) )
+ return VLC_ENOMEM;
+
+ srt_startup();
+
+ vlc_mutex_init( &p_sys->lock );
+
+ p_stream->p_sys = p_sys;
+
+ p_sys->i_poll_id = srt_epoll_create();
+ if ( p_sys->i_poll_id == -1 )
{
- srt_epoll_release( p_sys->i_poll_id );
- p_sys->i_poll_id = -1;
+ msg_Err( p_stream, "Failed to create poll id for SRT socket." );
+ goto failed;
}
- srt_close( p_sys->sock );
- free (psz_passphrase);
+ if ( !srt_schedule_reconnect( p_stream ) )
+ {
+ msg_Err( p_stream, "Failed to schedule connect");
+
+ goto failed;
+ }
+
+ p_stream->pf_block = BlockSRT;
+ p_stream->pf_control = Control;
+
+ return VLC_SUCCESS;
+
+failed:
+ vlc_mutex_destroy( &p_sys->lock );
+
+ if ( p_sys != NULL )
+ {
+ if ( p_sys->sock != -1 ) srt_close( p_sys->sock );
+ if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id );
+
+ vlc_obj_free( p_this, p_sys );
+ p_stream->p_sys = NULL;
+ }
return VLC_EGENERIC;
}
@@ -298,16 +359,19 @@ static void Close(vlc_object_t *p_this)
stream_t *p_stream = (stream_t*)p_this;
stream_sys_t *p_sys = p_stream->p_sys;
- if ( p_sys->i_poll_id != -1 )
+ if ( p_sys )
{
+ vlc_mutex_destroy( &p_sys->lock );
+
+ srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock );
+ srt_close( p_sys->sock );
srt_epoll_release( p_sys->i_poll_id );
- p_sys->i_poll_id = -1;
+
+ vlc_obj_free( p_this, p_sys );
+ p_stream->p_sys = NULL;
}
- msg_Dbg( p_stream, "closing server" );
- srt_close( p_sys->sock );
- vlc_close( p_sys->i_pipe_fds[0] );
- vlc_close( p_sys->i_pipe_fds[1] );
+ srt_cleanup();
}
/* Module descriptor */
--
2.16.1
More information about the vlc-devel
mailing list