[vlc-commits] access: rist: add support for multicast
Sergio Ammirata
git at videolan.org
Wed Mar 20 15:22:51 CET 2019
vlc | branch: master | Sergio Ammirata <sergio at ammirata.net> | Tue Mar 19 08:14:32 2019 -0400| [b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec] | committer: Thomas Guillem
access: rist: add support for multicast
Signed-off-by: Thomas Guillem <thomas at gllm.fr>
> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec
---
modules/access/rist.c | 150 +++++++++++++++++++++++++++++++++++++-------------
modules/access/rist.h | 32 ++++++++++-
2 files changed, 143 insertions(+), 39 deletions(-)
diff --git a/modules/access/rist.c b/modules/access/rist.c
index f6e572b863..e06828c88a 100644
--- a/modules/access/rist.c
+++ b/modules/access/rist.c
@@ -85,7 +85,11 @@ typedef struct
int i_max_packet_size;
int i_poll_timeout;
int i_poll_timeout_current;
- bool eof_on_reset;
+ bool b_ismulticast;
+ bool b_sendnacks;
+ bool b_sendblindnacks;
+ bool b_disablenacks;
+ bool b_flag_discontinuity;
block_fifo_t *p_fifo;
vlc_mutex_t lock;
uint64_t last_message;
@@ -97,6 +101,7 @@ typedef struct
float vbr_ratio;
uint16_t vbr_ratio_count;
uint32_t i_lost_packets;
+ uint32_t i_nack_packets;
uint32_t i_recovered_packets;
uint32_t i_reordered_packets;
uint32_t i_total_packets;
@@ -131,7 +136,7 @@ static struct rist_flow *rist_init_rx(void)
if (!flow)
return NULL;
- flow->reset = 2;
+ flow->reset = 1;
flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
if ( unlikely( flow->buffer == NULL ) )
@@ -139,6 +144,10 @@ static struct rist_flow *rist_init_rx(void)
free(flow);
return NULL;
}
+ flow->fd_in = -1;
+ flow->fd_nack = -1;
+ flow->fd_rtcp_m = -1;
+
return flow;
}
@@ -150,7 +159,7 @@ static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf,
vlc_mutex_unlock( &lock );
}
-static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url)
+static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast)
{
stream_sys_t *p_sys = p_access->p_sys;
msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
@@ -166,21 +175,47 @@ static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed
if (p_sys->flow->fd_in < 0)
{
msg_Err( p_access, "cannot open input socket" );
- return NULL;
+ goto fail;
}
- p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
- NULL, 0, IPPROTO_UDP);
+ if (b_ismulticast)
+ {
+ p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+ NULL, 0, IPPROTO_UDP);
+ if (p_sys->flow->fd_rtcp_m < 0)
+ {
+ msg_Err( p_access, "cannot open multicast nack socket" );
+ goto fail;
+ }
+ p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host,
+ parsed_url->i_port + 1, -1, IPPROTO_UDP );
+ }
+ else
+ {
+ p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+ NULL, 0, IPPROTO_UDP);
+ }
if (p_sys->flow->fd_nack < 0)
{
msg_Err( p_access, "cannot open nack socket" );
- return NULL;
+ goto fail;
}
populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
return p_sys->flow;
+
+fail:
+ if (p_sys->flow->fd_in != -1)
+ vlc_close(p_sys->flow->fd_in);
+ if (p_sys->flow->fd_nack != -1)
+ vlc_close(p_sys->flow->fd_nack);
+ if (p_sys->flow->fd_rtcp_m != -1)
+ vlc_close(p_sys->flow->fd_rtcp_m);
+ free(p_sys->flow->buffer);
+ free(p_sys->flow);
+ return NULL;
}
static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
@@ -263,8 +298,9 @@ static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin
len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
/* Write to Socket */
- rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
- (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+ if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+ rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+ (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
free(buf);
buf = NULL;
}
@@ -302,8 +338,9 @@ static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin
len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
/* Write to Socket */
- rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
- (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+ if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+ rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+ (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
free(buf);
buf = NULL;
}
@@ -361,6 +398,7 @@ static void send_nacks(stream_t *p_access, struct rist_flow *flow)
}
if (nacks_len > 0)
{
+ p_sys->i_nack_packets += nacks_len;
block_t *pkt_nacks = block_Alloc(nacks_len * 2);
if (pkt_nacks)
{
@@ -485,6 +523,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_
case RTCP_PT_SDES:
{
+ if (p_sys->b_sendnacks == false)
+ p_sys->b_sendnacks = true;
+ if (p_sys->b_ismulticast)
+ return;
/* Check for changes in source IP address or port */
int8_t name_length = rtcp_sdes_get_name_length(buf);
if (name_length > bytes_left)
@@ -535,6 +577,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_
break;
case RTCP_PT_SR:
+ if (p_sys->b_sendnacks == false)
+ p_sys->b_sendnacks = true;
+ if (p_sys->b_ismulticast)
+ return;
break;
default:
@@ -566,17 +612,8 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf,
uint32_t pkt_ts = rtp_get_timestamp(buf);
bool retrasnmitted = false;
bool success = true;
- uint64_t now = vlc_tick_now();
- if (flow->reset == 2)
- {
- if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) {
- msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
- }
- p_sys->last_message = now;
- return success;
- }
- else if (flow->reset == 1)
+ if (flow->reset == 1)
{
msg_Info(p_access, "Traffic detected after buffer reset");
/* First packet in the queue */
@@ -585,6 +622,7 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf,
flow->wi = idx;
flow->ri = idx;
flow->reset = 0;
+ p_sys->b_flag_discontinuity = true;
}
/* Check to see if this is a retransmission or a regular packet */
@@ -725,6 +763,7 @@ static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
flow->ri, flow->wi);
p_sys->i_lost_packets += loss_amount;
+ p_sys->b_flag_discontinuity = true;
}
return pktout;
@@ -769,23 +808,30 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
uint64_t now;
*eof = false;
block_t *pktout = NULL;
- struct pollfd pfd[2];
+ struct pollfd pfd[3];
int ret;
ssize_t r;
struct sockaddr_storage peer;
socklen_t slen = sizeof(struct sockaddr_storage);
struct rist_flow *flow = p_sys->flow;
- if (vlc_killed() || (flow->reset == 1 && p_sys->eof_on_reset))
+ if (vlc_killed())
{
*eof = true;
return NULL;
}
+ int poll_sockets = 2;
pfd[0].fd = flow->fd_in;
pfd[0].events = POLLIN;
pfd[1].fd = flow->fd_nack;
pfd[1].events = POLLIN;
+ if (p_sys->b_ismulticast)
+ {
+ pfd[2].fd = flow->fd_rtcp_m;
+ pfd[2].events = POLLIN;
+ poll_sockets++;
+ }
/* The protocol uses a fifo buffer with a fixed time delay.
* That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
@@ -795,7 +841,7 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
* the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
* most cases. */
- ret = vlc_poll_i11e(pfd, 2, p_sys->i_poll_timeout_current);
+ ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current);
if (unlikely(ret < 0))
return NULL;
else if (ret == 0)
@@ -827,6 +873,18 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
}
else {
+ if (p_sys->b_ismulticast == false)
+ rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
+ }
+ }
+ if (p_sys->b_ismulticast && pfd[2].revents & POLLIN)
+ {
+ r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size,
+ (struct sockaddr *)&peer, &slen);
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno));
+ }
+ else {
rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
}
}
@@ -853,11 +911,6 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
p_sys->i_poll_timeout_nonzero_count++;
}
}
- else
- {
- if (p_sys->eof_on_reset)
- *eof = true;
- }
}
}
@@ -901,13 +954,15 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
if (quality != 100)
- msg_Info(p_access, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \
- "%.2f, Link Quality %.2f%%", p_sys->i_total_packets, p_sys->i_recovered_packets,
- p_sys->i_reordered_packets, p_sys->i_lost_packets, ratio, quality);
+ msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
+ "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets,
+ p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets,
+ p_sys->i_lost_packets, ratio, quality);
p_sys->i_last_stat = now;
p_sys->vbr_ratio = 0;
p_sys->vbr_ratio_count = 0;
p_sys->i_lost_packets = 0;
+ p_sys->i_nack_packets = 0;
p_sys->i_recovered_packets = 0;
p_sys->i_reordered_packets = 0;
p_sys->i_total_packets = 0;
@@ -943,7 +998,13 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
}
if (pktout)
+ {
+ if (p_sys->b_flag_discontinuity) {
+ pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY;
+ p_sys->b_flag_discontinuity = false;
+ }
return pktout;
+ }
else
return NULL;
}
@@ -961,6 +1022,8 @@ static void Clean( stream_t *p_access )
net_Close (p_sys->flow->fd_in);
if (p_sys->flow->fd_nack >= 0)
net_Close (p_sys->flow->fd_nack);
+ if (p_sys->flow->fd_rtcp_m >= 0)
+ net_Close (p_sys->flow->fd_rtcp_m);
for (int i=0; i<RIST_QUEUE_SIZE; i++) {
struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
if (pkt->buffer && pkt->buffer->i_buffer > 0) {
@@ -1008,7 +1071,8 @@ static int Open(vlc_object_t *p_this)
}
/* Initialize rist flow */
- p_sys->flow = rist_udp_receiver(p_access, &parsed_url);
+ p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host);
+ p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast);
vlc_UrlClean( &parsed_url );
if (!p_sys->flow)
{
@@ -1017,14 +1081,23 @@ static int Open(vlc_object_t *p_this)
goto failed;
}
+ p_sys->b_flag_discontinuity = false;
+ p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" );
+ p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" );
+ if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false)
+ p_sys->b_sendnacks = true;
+ else
+ p_sys->b_sendnacks = false;
p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
- p_sys->eof_on_reset = var_InheritBool( p_access, "eof-on-reset" );
p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
- p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
+ if (p_sys->b_disablenacks)
+ p_sys->flow->reorder_buffer = p_sys->flow->latency;
+ else
+ p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
/* Convert to rtp times */
@@ -1073,11 +1146,14 @@ vlc_module_begin ()
add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
NULL, true )
add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
- add_bool( "eof-on-reset", false, "Trigger an EOF event when a buffer reset is triggered",
- "This is probably useful when you are decoding but not so much if you are streaming", true )
add_integer( "nack-type", NACK_FMT_RANGE,
N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
change_integer_list( nack_type, nack_type_names )
+ add_bool( "disable-nacks", false, "Disable NACK output packets",
+ "Use this to disable packet recovery", true )
+ add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder",
+ "Send nack messages even when we have not confirmed that the encoder is on our local " \
+ "network.", true )
set_capability( "access", 0 )
add_shortcut( "rist", "tr06" )
diff --git a/modules/access/rist.h b/modules/access/rist.h
index 75ade277d2..ec0f6ae0f9 100644
--- a/modules/access/rist.h
+++ b/modules/access/rist.h
@@ -92,11 +92,11 @@ struct rist_flow {
char cname[MAX_CNAME];
struct sockaddr_storage peer_sockaddr;
socklen_t peer_socklen;
- uint16_t ri,
- wi;
+ uint16_t ri, wi;
int fd_in;
int fd_out;
int fd_rtcp;
+ int fd_rtcp_m;
int fd_nack;
uint8_t nacks_retries[RIST_QUEUE_SIZE];
uint32_t hi_timestamp;
@@ -337,3 +337,31 @@ static inline ssize_t rist_Write(int fd, const void *buf, size_t len)
{
return rist_WriteTo(fd, buf, len, NULL, 0);
}
+
+static bool is_multicast_address(char *psz_dst_server)
+{
+ int ret;
+ int ismulticast = 0;
+
+ struct addrinfo hint = {
+ .ai_socktype = SOCK_DGRAM,
+ .ai_protocol = IPPROTO_UDP,
+ .ai_flags = AI_NUMERICSERV | AI_IDN | AI_PASSIVE,
+ }, *res;
+
+ ret = vlc_getaddrinfo(psz_dst_server, 0, &hint, &res);
+ if (ret) {
+ return 0;
+ } else if(res->ai_family == AF_INET) {
+ unsigned long addr = ntohl(inet_addr(psz_dst_server));
+ ismulticast = IN_MULTICAST(addr);
+ } else if (res->ai_family == AF_INET6) {
+ if (strlen(psz_dst_server) >= 5 && (strncmp("[ff00", psz_dst_server, 5) == 0 ||
+ strncmp("[FF00", psz_dst_server, 5) == 0))
+ ismulticast = 1;
+ }
+
+ freeaddrinfo(res);
+
+ return ismulticast;
+}
\ No newline at end of file
More information about the vlc-commits
mailing list