[vlc-devel] [PATCH 3.0.x 3/4] access: backport rist modules
Thomas Guillem
thomas at gllm.fr
Mon Nov 2 14:03:38 CET 2020
From: Sergio Ammirata <sergio at ammirata.net>
(cherry picked from commit 2cb6e8459739b558acdf4382b8c510284276bd32)
(cherry picked from commit b504fabcedafc025a3e664a215146a34f746f2e6)
(cherry picked from commit b72164d2c788524601118cebc0e63fbf6bde4375)
(cherry picked from commit 271ad1ad199a97d60f32361fa9ae317948351361)
(cherry picked from commit dad24f0b3858e1cfbefb2d861ed12e3add4756b4)
(cherry picked from commit d22baf21918cd9186734668f31cb74cf5a92d3a5)
(cherry picked from commit 7a2bd115d7ef392d4b8e0fea2d971ddbfbf29a8b)
(cherry picked from commit 732201ca09b0d99ab68ec9613542fa72e6db3c1d)
(cherry picked from commit b1b18401024fce92712c8da58e8f81116b0a8008)
(cherry picked from commit aa3ced83d6c2c6496590461c539eefa1ab4308c8)
(cherry picked from commit 4077054930070e4f7a7bcd6e6f1bff81e6f264a7)
(cherry picked from commit c05d16945ee63e53eb93156dfc5204b6b24522d0)
(cherry picked from commit 33381800e6d66382fb1d0be690fc096ac564f3f3)
(cherry picked from commit 381c61cff48064236a1e3b2c396b763ab4b5075d)
(cherry picked from commit f487da48e6433a342e14e614c4c29f8a8d6ad310)
(cherry picked from commit 307e3b7f40ca5442d0dfc4b53483f536b9a7400d)
(cherry picked from commit d5e53a14c39b78e971d616bf5c8ea97e2d57767f)
(cherry picked from commit b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec)
(cherry picked from commit f2ed24657adc5e379e326dbba3ca8541291ed425)
(cherry picked from commit 9383ba42056f8742956671b16082d930237060fc)
(cherry picked from commit fe9517f53c98f43972480c93eb770f00f368c19b)
Signed-off-by: Thomas Guillem <thomas at gllm.fr>
---
configure.ac | 6 +
modules/access/Makefile.am | 9 +
modules/access/rist.c | 1162 +++++++++++++++++++++++++++++
modules/access/rist.h | 369 +++++++++
modules/access_output/Makefile.am | 8 +
modules/access_output/rist.c | 876 ++++++++++++++++++++++
6 files changed, 2430 insertions(+)
create mode 100644 modules/access/rist.c
create mode 100644 modules/access/rist.h
create mode 100644 modules/access_output/rist.c
diff --git a/configure.ac b/configure.ac
index c844053d4d..7de9515ad3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -4013,6 +4013,12 @@ AS_IF([test "${enable_lirc}" = "yes"], [
])
AM_CONDITIONAL([HAVE_LIRC], [test "${have_lirc}" = "yes"])
+dnl
+dnl Bitstream
+dnl
+PKG_CHECK_MODULES([BITSTREAM], [bitstream > 1.4],[have_bitstream="yes"], [have_bitstream="no"])
+AM_CONDITIONAL([HAVE_BITSTREAM], [test "${have_bitstream}" = "yes"])
+
dnl
dnl SRT plugin
dnl
diff --git a/modules/access/Makefile.am b/modules/access/Makefile.am
index 5d2a5f915c..671e31f770 100644
--- a/modules/access/Makefile.am
+++ b/modules/access/Makefile.am
@@ -428,3 +428,12 @@ libaccess_srt_plugin_la_LIBADD = $(SRT_LIBS)
libaccess_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(accessdir)'
access_LTLIBRARIES += $(LTLIBaccess_srt)
EXTRA_LTLIBRARIES += libaccess_srt_plugin.la
+
+### RIST ###
+
+librist_plugin_la_SOURCES = access/rist.c access/rist.h
+librist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
+librist_plugin_la_LIBADD = $(SOCKET_LIBS)
+if HAVE_BITSTREAM
+access_LTLIBRARIES += librist_plugin.la
+endif
diff --git a/modules/access/rist.c b/modules/access/rist.c
new file mode 100644
index 0000000000..bb229868e7
--- /dev/null
+++ b/modules/access/rist.c
@@ -0,0 +1,1162 @@
+/*****************************************************************************
+ * rist.c: RIST (Reliable Internet Stream Transport) input module
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018-2020, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ * Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_interrupt.h>
+#include <vlc_plugin.h>
+#include <vlc_access.h>
+#include <vlc_threads.h>
+#include <vlc_network.h>
+#include <vlc_block.h>
+#include <vlc_url.h>
+#include <poll.h>
+#include <bitstream/ietf/rtcp_rr.h>
+#include <bitstream/ietf/rtcp_sdes.h>
+#include <bitstream/ietf/rtcp_fb.h>
+#include <bitstream/ietf/rtp.h>
+
+#include "rist.h"
+
+/* The default latency is 1000 ms */
+#define RIST_DEFAULT_LATENCY 1000
+/* The default nack retry interval */
+#define RIST_DEFAULT_RETRY_INTERVAL 132
+/* The default packet re-ordering buffer */
+#define RIST_DEFAULT_REORDER_BUFFER 70
+/* The default max packet size */
+#define RIST_MAX_PACKET_SIZE 1472
+/* The default timeout is 5 ms */
+#define RIST_DEFAULT_POLL_TIMEOUT 5
+/* The max retry count for nacks */
+#define RIST_MAX_RETRIES 10
+/* The rate at which we process and send nack requests */
+#define NACK_INTERVAL 5 /*ms*/
+/* Calculate and print stats once per second */
+#define STATS_INTERVAL 1000 /*ms*/
+
+static const int nack_type[] = {
+ 0, 1,
+};
+
+static const char *const nack_type_names[] = {
+ N_("Range"), N_("Bitmask"),
+};
+
+enum NACK_TYPE {
+ NACK_FMT_RANGE = 0,
+ NACK_FMT_BITMASK
+};
+
+struct stream_sys_t
+{
+ struct rist_flow *flow;
+ char sender_name[MAX_CNAME];
+ enum NACK_TYPE nack_type;
+ uint64_t last_data_rx;
+ uint64_t last_nack_tx;
+ vlc_thread_t thread;
+ int i_max_packet_size;
+ int i_poll_timeout;
+ int i_poll_timeout_current;
+ bool b_ismulticast;
+ bool b_sendnacks;
+ bool b_sendblindnacks;
+ bool b_disablenacks;
+ bool b_flag_discontinuity;
+ block_fifo_t *p_fifo;
+ vlc_mutex_t lock;
+ uint64_t last_message;
+ uint64_t last_reset;
+ /* stat variables */
+ uint32_t i_poll_timeout_zero_count;
+ uint32_t i_poll_timeout_nonzero_count;
+ uint64_t i_last_stat;
+ float vbr_ratio;
+ uint16_t vbr_ratio_count;
+ uint32_t i_lost_packets;
+ uint32_t i_nack_packets;
+ uint32_t i_recovered_packets;
+ uint32_t i_reordered_packets;
+ uint32_t i_total_packets;
+};
+
+static int Control(stream_t *p_access, int i_query, va_list args)
+{
+ switch( i_query )
+ {
+ case STREAM_CAN_SEEK:
+ case STREAM_CAN_FASTSEEK:
+ case STREAM_CAN_PAUSE:
+ case STREAM_CAN_CONTROL_PACE:
+ *va_arg( args, bool * ) = false;
+ break;
+
+ case STREAM_GET_PTS_DELAY:
+ *va_arg( args, int64_t * ) = RIST_TICK_FROM_MS(
+ var_InheritInteger(p_access, "network-caching") );
+ break;
+
+ default:
+ return VLC_EGENERIC;
+ }
+
+ return VLC_SUCCESS;
+}
+
+static struct rist_flow *rist_init_rx(void)
+{
+ struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
+ if (!flow)
+ return NULL;
+
+ flow->reset = 1;
+ flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
+
+ if ( unlikely( flow->buffer == NULL ) )
+ {
+ free(flow);
+ return NULL;
+ }
+ flow->fd_in = -1;
+ flow->fd_nack = -1;
+ flow->fd_rtcp_m = -1;
+
+ return flow;
+}
+
+static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len,
+ const struct sockaddr *peer, socklen_t slen)
+{
+ vlc_mutex_lock( &lock );
+ rist_WriteTo_i11e(fd, buf, len, peer, slen);
+ vlc_mutex_unlock( &lock );
+}
+
+static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
+ parsed_url->psz_host, parsed_url->i_port,
+ parsed_url->psz_host, parsed_url->i_port+1);
+
+ p_sys->flow = rist_init_rx();
+ if (!p_sys->flow)
+ return NULL;
+
+ p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
+ 0, IPPROTO_UDP);
+ if (p_sys->flow->fd_in < 0)
+ {
+ msg_Err( p_access, "cannot open input socket" );
+ goto fail;
+ }
+
+ if (b_ismulticast)
+ {
+ p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+ NULL, 0, IPPROTO_UDP);
+ if (p_sys->flow->fd_rtcp_m < 0)
+ {
+ msg_Err( p_access, "cannot open multicast nack socket" );
+ goto fail;
+ }
+ p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host,
+ parsed_url->i_port + 1, -1, IPPROTO_UDP );
+ }
+ else
+ {
+ p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+ NULL, 0, IPPROTO_UDP);
+ }
+ if (p_sys->flow->fd_nack < 0)
+ {
+ msg_Err( p_access, "cannot open nack socket" );
+ goto fail;
+ }
+
+ populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
+ msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
+
+ return p_sys->flow;
+
+fail:
+ if (p_sys->flow->fd_in != -1)
+ vlc_close(p_sys->flow->fd_in);
+ if (p_sys->flow->fd_nack != -1)
+ vlc_close(p_sys->flow->fd_nack);
+ if (p_sys->flow->fd_rtcp_m != -1)
+ vlc_close(p_sys->flow->fd_rtcp_m);
+ free(p_sys->flow->buffer);
+ free(p_sys->flow);
+ return NULL;
+}
+
+static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
+{
+ if (flow->ri <= flow->wi) {
+ return ((idx > flow->ri) && (idx <= flow->wi));
+ } else {
+ return ((idx > flow->ri) || (idx <= flow->wi));
+ }
+}
+
+static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ int namelen = strlen(flow->cname) + 1;
+
+ /* we need to make sure it is a multiple of 4, pad if necessary */
+ if ((namelen - 2) & 0x3)
+ namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
+
+ int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
+ uint8_t *buf = malloc(rtcp_feedback_size);
+ if ( unlikely( buf == NULL ) )
+ return;
+
+ /* Populate RR */
+ uint8_t *rr = buf;
+ rtp_set_hdr(rr);
+ rtcp_rr_set_pt(rr);
+ rtcp_set_length(rr, 1);
+ rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
+
+ /* Populate SDES */
+ uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
+ rtp_set_hdr(p_sdes);
+ rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
+ rtcp_sdes_set_pt(p_sdes);
+ rtcp_set_length(p_sdes, (namelen >> 2) + 2);
+ rtcp_sdes_set_cname(p_sdes, 1);
+ rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
+ uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
+ strlcpy((char *)p_sdes_name, flow->cname, namelen);
+
+ /* Write to Socket */
+ rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size,
+ (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+ free(buf);
+ buf = NULL;
+}
+
+static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ struct rist_flow *flow = p_sys->flow;
+ int len = 0;
+
+ int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
+ RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+ uint8_t *buf = malloc(bbnack_bufsize);
+ if ( unlikely( buf == NULL ) )
+ return;
+
+ /* Populate NACKS */
+ uint8_t *nack = buf;
+ rtp_set_hdr(nack);
+ rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
+ rtcp_set_pt(nack, RTCP_PT_RTPFB);
+ rtcp_set_length(nack, 2 + nack_count);
+ /*uint8_t name[4] = "RIST";*/
+ /*rtcp_fb_set_ssrc_media_src(nack, name);*/
+ len += RTCP_FB_HEADER_SIZE;
+ /* TODO : group together */
+ uint16_t nacks[MAX_NACKS];
+ memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
+ for (int i = 0; i < nack_count; i++) {
+ uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
+ rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
+ rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
+ }
+ len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+
+ /* Write to Socket */
+ if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+ rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+ (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+ free(buf);
+ buf = NULL;
+}
+
+static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ struct rist_flow *flow = p_sys->flow;
+ int len = 0;
+
+ int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
+ RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+ uint8_t *buf = malloc(rbnack_bufsize);
+ if ( unlikely( buf == NULL ) )
+ return;
+
+ /* Populate NACKS */
+ uint8_t *nack = buf;
+ rtp_set_hdr(nack);
+ rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
+ rtcp_set_pt(nack, RTCP_PT_RTPFR);
+ rtcp_set_length(nack, 2 + nack_count);
+ uint8_t name[4] = "RIST";
+ rtcp_fb_set_ssrc_media_src(nack, name);
+ len += RTCP_FB_HEADER_SIZE;
+ /* TODO : group together */
+ uint16_t nacks[MAX_NACKS];
+ memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
+ for (int i = 0; i < nack_count; i++)
+ {
+ uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
+ rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
+ rtcp_fb_nack_set_range_extra(nack_record, 0);
+ }
+ len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+
+ /* Write to Socket */
+ if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+ rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+ (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+ free(buf);
+ buf = NULL;
+}
+
+static void send_nacks(stream_t *p_access, struct rist_flow *flow)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ struct rtp_pkt *pkt;
+ uint16_t idx;
+ uint64_t last_ts = 0;
+ uint16_t null_count = 0;
+ int nacks_len = 0;
+ uint16_t nacks[MAX_NACKS];
+
+ idx = flow->ri;
+ while(idx++ != flow->wi)
+ {
+ pkt = &(flow->buffer[idx]);
+ if (pkt->buffer == NULL)
+ {
+ if (nacks_len + 1 >= MAX_NACKS)
+ {
+ break;
+ }
+ else
+ {
+ null_count++;
+ /* TODO: after adding average spacing calculation, change this formula
+ to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
+ uint64_t extrapolated_ts = last_ts;
+ /* Find out the age and add it only if necessary */
+ int retry_count = flow->nacks_retries[idx];
+ uint64_t age = flow->hi_timestamp - extrapolated_ts;
+ uint64_t expiration;
+ if (retry_count == 0){
+ expiration = flow->reorder_buffer;
+ } else {
+ expiration = (uint64_t)flow->nacks_retries[idx] * (uint64_t)flow->retry_interval;
+ }
+ if (age > expiration && retry_count <= flow->max_retries)
+ {
+ flow->nacks_retries[idx]++;
+ nacks[nacks_len++] = idx;
+ msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
+ "expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000,
+ flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
+ }
+ }
+ }
+ else
+ {
+ last_ts = pkt->rtp_ts;
+ null_count = 0;
+ }
+ }
+ if (nacks_len > 0)
+ {
+ p_sys->i_nack_packets += nacks_len;
+ block_t *pkt_nacks = block_Alloc(nacks_len * 2);
+ if (pkt_nacks)
+ {
+ memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
+ pkt_nacks->i_buffer = nacks_len * 2;
+ block_FifoPut( p_sys->p_fifo, pkt_nacks );
+ }
+ }
+}
+
+static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
+{
+#define CMP(a, b) if (a != b) return a < b ? -1 : 1
+
+ CMP(x->sa_family, y->sa_family);
+
+ if (x->sa_family == AF_INET)
+ {
+ struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
+ CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
+ CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
+ }
+ else if (x->sa_family == AF_INET6)
+ {
+ struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
+ int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr,
+ sizeof(xin6->sin6_addr.s6_addr));
+ if (r != 0)
+ return r;
+ CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
+ CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
+ CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
+ }
+
+#undef CMP
+ return 0;
+}
+
+static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
+{
+ if (x->sa_family == AF_INET)
+ {
+ struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
+ msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
+ inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr),
+ ntohs(yin->sin_port));
+ }
+ else if (x->sa_family == AF_INET6)
+ {
+ struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
+ char oldstr[INET6_ADDRSTRLEN];
+ char newstr[INET6_ADDRSTRLEN];
+ inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
+ inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
+ msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
+ oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
+ }
+}
+
+static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
+{
+ if (x->sa_family == AF_INET)
+ {
+ struct sockaddr_in *xin = (void*)x;
+ msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
+ }
+ else if (x->sa_family == AF_INET6)
+ {
+ struct sockaddr_in6 *xin6 = (void*)x;
+ char str[INET6_ADDRSTRLEN];
+ inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
+ msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
+ }
+}
+
+static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len,
+ struct sockaddr *peer, socklen_t slen)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ uint8_t ptype;
+ uint16_t processed_bytes = 0;
+ uint16_t records;
+ char new_sender_name[MAX_CNAME];
+ uint8_t *buf;
+
+ while (processed_bytes < len) {
+ buf = buf_in + processed_bytes;
+ /* safety checks */
+ uint16_t bytes_left = len - processed_bytes + 1;
+ if ( bytes_left < 4 )
+ {
+ /* we must have at least 4 bytes */
+ msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
+ bytes_left);
+ return;
+ }
+ else if (!rtp_check_hdr(buf))
+ {
+ /* check for a valid rtp header */
+ msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
+ return;
+ }
+
+ ptype = rtcp_get_pt(buf);
+ records = rtcp_get_length(buf);
+ uint16_t bytes = (uint16_t)(4 * (1 + records));
+ if (bytes > bytes_left)
+ {
+ /* check for a sane number of bytes */
+ msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
+ "packet, got a buffer of %u bytes.", rtcp_get_length(buf), bytes, bytes_left);
+ return;
+ }
+
+ switch(ptype) {
+ case RTCP_PT_RTPFR:
+ case RTCP_PT_RTPFB:
+ break;
+
+ case RTCP_PT_RR:
+ break;
+
+ case RTCP_PT_SDES:
+ {
+ if (p_sys->b_sendnacks == false)
+ p_sys->b_sendnacks = true;
+ if (p_sys->b_ismulticast)
+ return;
+ /* Check for changes in source IP address or port */
+ int8_t name_length = rtcp_sdes_get_name_length(buf);
+ if (name_length > bytes_left || name_length <= 0 ||
+ (size_t)name_length > sizeof(new_sender_name))
+ {
+ /* check for a sane number of bytes */
+ msg_Err(p_access, "Malformed SDES packet, wrong cname len %d, got a " \
+ "buffer of %u bytes.", name_length, bytes_left);
+ return;
+ }
+ bool ip_port_changed = false;
+ if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
+ {
+ ip_port_changed = true;
+ if(flow->peer_socklen > 0)
+ print_sockaddr_info_change(p_access,
+ (struct sockaddr *)&flow->peer_sockaddr, peer);
+ else
+ print_sockaddr_info(p_access, peer);
+ vlc_mutex_lock( &p_sys->lock );
+ memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
+ flow->peer_socklen = slen;
+ vlc_mutex_unlock( &p_sys->lock );
+ }
+
+ /* Check for changes in cname */
+ bool peer_name_changed = false;
+ memset(new_sender_name, 0, MAX_CNAME);
+ memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
+ if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
+ {
+ peer_name_changed = true;
+ if (strcmp(p_sys->sender_name, "") == 0)
+ msg_Info(p_access, "Peer Name: %s", new_sender_name);
+ else
+ msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
+ "Name: %s", p_sys->sender_name, new_sender_name);
+ memset(p_sys->sender_name, 0, MAX_CNAME);
+ memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
+ }
+
+ /* Reset the buffer as the source must have been restarted */
+ if (peer_name_changed || ip_port_changed)
+ {
+ /* reset the buffer */
+ flow->reset = 1;
+ }
+ }
+ break;
+
+ case RTCP_PT_SR:
+ if (p_sys->b_sendnacks == false)
+ p_sys->b_sendnacks = true;
+ if (p_sys->b_ismulticast)
+ return;
+ break;
+
+ default:
+ msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
+ }
+ processed_bytes += (4 * (1 + records));
+ }
+}
+
+static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+
+ /* safety checks */
+ if ( len < RTP_HEADER_SIZE )
+ {
+ /* check if packet size >= rtp header size */
+ msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %zu", len);
+ return false;
+ }
+ else if (!rtp_check_hdr(buf))
+ {
+ /* check for a valid rtp header */
+ msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
+ return false;
+ }
+
+ uint16_t idx = rtp_get_seqnum(buf);
+ uint32_t pkt_ts = rtp_get_timestamp(buf);
+ bool retrasnmitted = false;
+ bool success = true;
+
+ if (flow->reset == 1)
+ {
+ msg_Info(p_access, "Traffic detected after buffer reset");
+ /* First packet in the queue */
+ flow->hi_timestamp = pkt_ts;
+ msg_Info(p_access, "ts@%u", flow->hi_timestamp);
+ flow->wi = idx;
+ flow->ri = idx;
+ flow->reset = 0;
+ p_sys->b_flag_discontinuity = true;
+ }
+
+ /* Check to see if this is a retransmission or a regular packet */
+ if (buf[11] & (1 << 0))
+ {
+ msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi,
+ flow->wi-flow->ri);
+ p_sys->i_recovered_packets++;
+ retrasnmitted = true;
+ }
+ else if (flow->wi != flow->ri)
+ {
+ /* Reset counter to 0 on incoming holes */
+ /* Regular packets only as retransmits are expected to come in out of order */
+ uint16_t idxnext = (uint16_t)(flow->wi + 1);
+ if (idx != idxnext)
+ {
+ if (idx > idxnext) {
+ msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
+ idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
+ } else {
+ p_sys->i_reordered_packets++;
+ msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx,
+ idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
+ }
+ uint16_t zero_counter = (uint16_t)(flow->wi + 1);
+ while(zero_counter++ != idx) {
+ flow->nacks_retries[zero_counter] = 0;
+ }
+ /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
+ idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
+ }
+ }
+
+ /* Always replace the existing one with the new one */
+ struct rtp_pkt *pkt;
+ pkt = &(flow->buffer[idx]);
+ if (pkt->buffer && pkt->buffer->i_buffer > 0)
+ {
+ block_Release(pkt->buffer);
+ pkt->buffer = NULL;
+ }
+ pkt->buffer = block_Alloc(len);
+ if (!pkt->buffer)
+ return false;
+
+ pkt->buffer->i_buffer = len;
+ memcpy(pkt->buffer->p_buffer, buf, len);
+ pkt->rtp_ts = pkt_ts;
+ p_sys->last_data_rx = mdate();
+ /* Reset the try counter regardless of wether it was a retransmit or not */
+ flow->nacks_retries[idx] = 0;
+
+ if (retrasnmitted)
+ return success;
+
+ p_sys->i_total_packets++;
+ /* Perform discontinuity checks and udpdate counters */
+ if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
+ {
+ if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
+ {
+ msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
+ flow->wi, pkt_ts, flow->hi_timestamp);
+ flow->reset = 1;
+ success = false;
+ }
+ else
+ {
+ flow->wi = idx;
+ flow->hi_timestamp = pkt_ts;
+ }
+ }
+ else if (!is_index_in_range(flow, idx))
+ {
+ /* incoming timestamp just jumped back in time or index is outside of scope */
+ msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
+ flow->wi, pkt_ts, flow->hi_timestamp);
+ flow->reset = 1;
+ success = false;
+ }
+
+ return success;
+}
+
+static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ block_t *pktout = NULL;
+ struct rtp_pkt *pkt;
+ uint16_t idx;
+ if (flow->ri == flow->wi || flow->reset > 0)
+ return NULL;
+
+ idx = flow->ri;
+ bool found_data = false;
+ uint16_t loss_amount = 0;
+ while(idx++ != flow->wi) {
+
+ pkt = &(flow->buffer[idx]);
+ if (!pkt->buffer)
+ {
+ /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
+ loss_amount++;
+ /* We move ahead until we find a timestamp but we do not move the cursor.
+ * None of them are guaranteed packet loss because we do not really
+ * know their timestamps. They might still arrive on the next loop.
+ * We can confirm the loss only if we get a valid packet in the loop below. */
+ continue;
+ }
+
+ /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
+ flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
+ if (flow->hi_timestamp > (uint32_t)(pkt->rtp_ts + flow->rtp_latency))
+ {
+ /* Populate output packet now but remove rtp header from source */
+ int newSize = pkt->buffer->i_buffer - RTP_HEADER_SIZE;
+ pktout = block_Alloc(newSize);
+ if (pktout)
+ {
+ pktout->i_buffer = newSize;
+ memcpy(pktout->p_buffer, pkt->buffer->p_buffer + RTP_HEADER_SIZE, newSize);
+ /* free the buffer and increase the read index */
+ flow->ri = idx;
+ /* TODO: calculate average duration using buffer average (bring from sender) */
+ found_data = true;
+ }
+ block_Release(pkt->buffer);
+ pkt->buffer = NULL;
+ break;
+ }
+
+ }
+
+ if (loss_amount > 0 && found_data == true)
+ {
+ /* Packet loss confirmed, we found valid data after the holes */
+ msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
+ flow->ri, flow->wi);
+ p_sys->i_lost_packets += loss_amount;
+ p_sys->b_flag_discontinuity = true;
+ }
+
+ return pktout;
+}
+
+static void *rist_thread(void *data)
+{
+ stream_t *p_access = data;
+ stream_sys_t *p_sys = p_access->p_sys;
+
+ /* Process nacks every 5ms */
+ /* We only ask for the relevant ones */
+ for (;;) {
+ block_t *pkt_nacks = block_FifoGet(p_sys->p_fifo);
+
+ int canc = vlc_savecancel();
+
+ /* there are two bytes per nack */
+ uint16_t nack_count = (uint16_t)pkt_nacks->i_buffer/2;
+ switch(p_sys->nack_type) {
+ case NACK_FMT_BITMASK:
+ send_bbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
+ break;
+
+ default:
+ send_rbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
+ }
+
+ if (nack_count > 1)
+ msg_Dbg(p_access, "Sent %u NACKs !!!", nack_count);
+ block_Release(pkt_nacks);
+
+ vlc_restorecancel (canc);
+ }
+
+ return NULL;
+}
+
+static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+ uint64_t now;
+ *eof = false;
+ block_t *pktout = NULL;
+ struct pollfd pfd[3];
+ int ret;
+ ssize_t r;
+ struct sockaddr_storage peer;
+ socklen_t slen = sizeof(struct sockaddr_storage);
+ struct rist_flow *flow = p_sys->flow;
+
+ if (vlc_killed())
+ {
+ *eof = true;
+ return NULL;
+ }
+
+ int poll_sockets = 2;
+ pfd[0].fd = flow->fd_in;
+ pfd[0].events = POLLIN;
+ pfd[1].fd = flow->fd_nack;
+ pfd[1].events = POLLIN;
+ if (p_sys->b_ismulticast)
+ {
+ pfd[2].fd = flow->fd_rtcp_m;
+ pfd[2].events = POLLIN;
+ poll_sockets++;
+ }
+
+ /* The protocol uses a fifo buffer with a fixed time delay.
+ * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
+ * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
+ * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
+ * and also every time we get a poll timeout. The configurable poll timeout is for controling
+ * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
+ * most cases. */
+
+ ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current);
+ if (unlikely(ret < 0))
+ return NULL;
+ else if (ret == 0)
+ {
+ /* Poll timeout, check the queue for the next packet that needs to be delivered */
+ pktout = rist_dequeue(p_access, flow);
+ /* if there is data, we need to come back faster to finish emptying it */
+ if (pktout) {
+ p_sys->i_poll_timeout_current = 0;
+ p_sys->i_poll_timeout_zero_count++;
+ } else {
+ p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
+ p_sys->i_poll_timeout_nonzero_count++;
+ }
+ }
+ else
+ {
+
+ uint8_t *buf = malloc(p_sys->i_max_packet_size);
+ if ( unlikely( buf == NULL ) )
+ return NULL;
+
+ /* Process rctp incoming data */
+ if (pfd[1].revents & POLLIN)
+ {
+ r = rist_ReadFrom_i11e(flow->fd_nack, buf, p_sys->i_max_packet_size,
+ (struct sockaddr *)&peer, &slen);
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
+ }
+ else {
+ if (p_sys->b_ismulticast == false)
+ rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
+ }
+ }
+ if (p_sys->b_ismulticast && pfd[2].revents & POLLIN)
+ {
+ r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size,
+ (struct sockaddr *)&peer, &slen);
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno));
+ }
+ else {
+ rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
+ }
+ }
+
+ /* Process regular incoming data */
+ if (pfd[0].revents & POLLIN)
+ {
+ r = rist_Read_i11e(flow->fd_in, buf, p_sys->i_max_packet_size);
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "socket %d error: %s\n", flow->fd_in, gai_strerror(errno));
+ }
+ else
+ {
+ /* rist_input will process and queue the pkt */
+ if (rist_input(p_access, flow, buf, r))
+ {
+ /* Check the queue for the next packet that needs to be delivered */
+ pktout = rist_dequeue(p_access, flow);
+ if (pktout) {
+ p_sys->i_poll_timeout_current = 0;
+ p_sys->i_poll_timeout_zero_count++;
+ } else {
+ p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
+ p_sys->i_poll_timeout_nonzero_count++;
+ }
+ }
+ }
+ }
+
+ free(buf);
+ buf = NULL;
+ }
+
+ now = mdate();
+
+ /* Process stats and print them out */
+ /* We need to measure some items every 70ms */
+ uint64_t interval = (now - flow->feedback_time);
+ if ( interval > RIST_TICK_FROM_MS(RTCP_INTERVAL) )
+ {
+ if (p_sys->i_poll_timeout_nonzero_count > 0)
+ {
+ float ratio = (float)p_sys->i_poll_timeout_zero_count
+ / (float)p_sys->i_poll_timeout_nonzero_count;
+ if (ratio <= 1)
+ p_sys->vbr_ratio += 1 - ratio;
+ else
+ p_sys->vbr_ratio += ratio - 1;
+ p_sys->vbr_ratio_count++;
+ /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
+ p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
+ p_sys->i_poll_timeout_zero_count = 0;
+ p_sys->i_poll_timeout_nonzero_count = 0;
+ }
+ }
+ /* We print out the stats once per second */
+ interval = (now - p_sys->i_last_stat);
+ if ( interval > RIST_TICK_FROM_MS(STATS_INTERVAL) )
+ {
+ if ( p_sys->i_lost_packets > 0)
+ msg_Err(p_access, "We have %d lost packets", p_sys->i_lost_packets);
+ float ratio = 1;
+ if (p_sys->vbr_ratio_count > 0)
+ ratio = p_sys->vbr_ratio / (float)p_sys->vbr_ratio_count;
+ float quality = 100;
+ if (p_sys->i_total_packets > 0)
+ quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
+ p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
+ if (quality != 100)
+ msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
+ "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets,
+ p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets,
+ p_sys->i_lost_packets, ratio, quality);
+ p_sys->i_last_stat = now;
+ p_sys->vbr_ratio = 0;
+ p_sys->vbr_ratio_count = 0;
+ p_sys->i_lost_packets = 0;
+ p_sys->i_nack_packets = 0;
+ p_sys->i_recovered_packets = 0;
+ p_sys->i_reordered_packets = 0;
+ p_sys->i_total_packets = 0;
+ }
+
+ /* Send rtcp feedback every RTCP_INTERVAL */
+ interval = (now - flow->feedback_time);
+ if ( interval > RIST_TICK_FROM_MS(RTCP_INTERVAL) )
+ {
+ /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
+ RIST_TICK_FROM_MS(RTCP_INTERVAL)); */
+ send_rtcp_feedback(p_access, flow);
+ flow->feedback_time = now;
+ }
+
+ /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
+ interval = (now - p_sys->last_nack_tx);
+ if ( interval > RIST_TICK_FROM_MS(NACK_INTERVAL) )
+ {
+ send_nacks(p_access, p_sys->flow);
+ p_sys->last_nack_tx = now;
+ }
+
+ /* Safety check for when the input stream stalls */
+ if ( p_sys->last_data_rx > 0 && now > p_sys->last_data_rx &&
+ (uint64_t)(now - p_sys->last_data_rx) > (uint64_t)RIST_TICK_FROM_MS(flow->latency) &&
+ (uint64_t)(now - p_sys->last_reset) > (uint64_t)RIST_TICK_FROM_MS(flow->latency) )
+ {
+ msg_Err(p_access, "No data received for %"PRId64" ms, resetting buffers",
+ (int64_t)(now - p_sys->last_data_rx)/1000);
+ p_sys->last_reset = now;
+ flow->reset = 1;
+ }
+
+ if (pktout)
+ {
+ if (p_sys->b_flag_discontinuity) {
+ pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY;
+ p_sys->b_flag_discontinuity = false;
+ }
+ return pktout;
+ }
+ else
+ return NULL;
+}
+
+static void Clean( stream_t *p_access )
+{
+ stream_sys_t *p_sys = p_access->p_sys;
+
+ if( likely(p_sys->p_fifo != NULL) )
+ block_FifoRelease( p_sys->p_fifo );
+
+ if (p_sys->flow)
+ {
+ if (p_sys->flow->fd_in >= 0)
+ net_Close (p_sys->flow->fd_in);
+ if (p_sys->flow->fd_nack >= 0)
+ net_Close (p_sys->flow->fd_nack);
+ if (p_sys->flow->fd_rtcp_m >= 0)
+ net_Close (p_sys->flow->fd_rtcp_m);
+ for (int i=0; i<RIST_QUEUE_SIZE; i++) {
+ struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
+ if (pkt->buffer && pkt->buffer->i_buffer > 0) {
+ block_Release(pkt->buffer);
+ pkt->buffer = NULL;
+ }
+ }
+ free(p_sys->flow->buffer);
+ free(p_sys->flow);
+ }
+
+ vlc_mutex_destroy( &p_sys->lock );
+}
+
+static void Close(vlc_object_t *p_this)
+{
+ stream_t *p_access = (stream_t*)p_this;
+ stream_sys_t *p_sys = p_access->p_sys;
+
+ vlc_cancel(p_sys->thread);
+ vlc_join(p_sys->thread, NULL);
+
+ Clean( p_access );
+}
+
+static int Open(vlc_object_t *p_this)
+{
+ stream_t *p_access = (stream_t*)p_this;
+ stream_sys_t *p_sys = NULL;
+ vlc_url_t parsed_url = { 0 };
+
+ p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+ if( unlikely( p_sys == NULL ) )
+ return VLC_ENOMEM;
+
+ p_access->p_sys = p_sys;
+
+ vlc_mutex_init( &p_sys->lock );
+
+ if ( vlc_UrlParse( &parsed_url, p_access->psz_url ) == -1 )
+ {
+ msg_Err( p_access, "Failed to parse input URL (%s)",
+ p_access->psz_url );
+ goto failed;
+ }
+
+ /* Initialize rist flow */
+ p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host);
+ p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast);
+ vlc_UrlClean( &parsed_url );
+ if (!p_sys->flow)
+ {
+ msg_Err( p_access, "Failed to open rist flow (%s)",
+ p_access->psz_url );
+ goto failed;
+ }
+
+ p_sys->b_flag_discontinuity = false;
+ p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" );
+ p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" );
+ if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false)
+ p_sys->b_sendnacks = true;
+ else
+ p_sys->b_sendnacks = false;
+ p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
+ p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
+ p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
+ p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
+ p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
+ p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
+ if (p_sys->b_disablenacks)
+ p_sys->flow->reorder_buffer = p_sys->flow->latency;
+ else
+ p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
+ msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
+
+ /* Convert to rtp times */
+ p_sys->flow->rtp_latency = rtp_get_ts(RIST_TICK_FROM_MS(p_sys->flow->latency));
+ p_sys->flow->retry_interval = rtp_get_ts(RIST_TICK_FROM_MS(p_sys->flow->retry_interval));
+ p_sys->flow->reorder_buffer = rtp_get_ts(RIST_TICK_FROM_MS(p_sys->flow->reorder_buffer));
+
+ p_sys->p_fifo = block_FifoNew();
+ if( unlikely(p_sys->p_fifo == NULL) )
+ goto failed;
+
+ /* This extra thread is for sending feedback/nack packets even when no data comes in */
+ if (vlc_clone(&p_sys->thread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
+ {
+ msg_Err(p_access, "Failed to create worker thread.");
+ goto failed;
+ }
+
+ p_access->pf_block = BlockRIST;
+ p_access->pf_control = Control;
+
+ return VLC_SUCCESS;
+
+failed:
+ Clean( p_access );
+ return VLC_EGENERIC;
+}
+
+/* Module descriptor */
+vlc_module_begin ()
+
+ set_shortname( N_("RIST") )
+ set_description( N_("RIST input") )
+ set_category( CAT_INPUT )
+ set_subcategory( SUBCAT_INPUT_ACCESS )
+
+ add_integer( "packet-size", RIST_MAX_PACKET_SIZE,
+ N_("RIST maximum packet size (bytes)"), NULL, true )
+ add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT,
+ N_("RIST demux/decode maximum jitter (default is 5ms)"),
+ N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
+ "The lower the value, the more CPU cycles the algorithm will consume"), true )
+ add_integer( "latency", RIST_DEFAULT_LATENCY, N_("RIST latency (ms)"), NULL, true )
+ add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL, N_("RIST nack retry interval (ms)"),
+ NULL, true )
+ add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
+ NULL, true )
+ add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
+ add_integer( "nack-type", NACK_FMT_RANGE,
+ N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
+ change_integer_list( nack_type, nack_type_names )
+ add_bool( "disable-nacks", false, "Disable NACK output packets",
+ "Use this to disable packet recovery", true )
+ add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder",
+ "Send nack messages even when we have not confirmed that the encoder is on our local " \
+ "network.", true )
+
+ set_capability( "access", 0 )
+ add_shortcut( "rist", "tr06" )
+
+ set_callbacks( Open, Close )
+
+vlc_module_end ()
diff --git a/modules/access/rist.h b/modules/access/rist.h
new file mode 100644
index 0000000000..263b43cb2e
--- /dev/null
+++ b/modules/access/rist.h
@@ -0,0 +1,369 @@
+/*****************************************************************************
+ * rist.h: RIST (Reliable Internet Stream Transport) helper
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ * Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#include <stdint.h>
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+#include <errno.h>
+#include <assert.h>
+
+/*****************************************************************************
+ * Public API
+ *****************************************************************************/
+
+/* RIST */
+
+/* RTP header format (RFC 3550) */
+/*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ |V=2|P|X| CC |M| PT | sequence number |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | timestamp |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | synchronization source (SSRC) identifier |
+ +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
+ | contributing source (CSRC) identifiers |
+ | .... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+
+#define RIST_QUEUE_SIZE 65536
+#define RTP_PKT_SIZE (1472)
+
+#define RTCP_INTERVAL 75 /*ms*/
+
+#define SEVENTY_YEARS_OFFSET (2208988800ULL)
+#define MAX_NACKS 32
+#define MAX_CNAME 128
+#define RTCP_EMPTY_RR_SIZE 8
+
+#define RTCP_PT_RTPFR 204
+
+#define RIST_TICK_FROM_MS(ms) ((CLOCK_FREQ / INT64_C(1000)) * (ms))
+
+struct rtp_pkt {
+ uint32_t rtp_ts;
+ struct block_t *buffer;
+};
+
+/* RIST NACK header format */
+/*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | SNBase low bits | Length recovery |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ |E| PT recovery | Mask |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | TS recovery |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ |N|D|type |index| Offset | NA |SNBase ext bits|
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+
+struct rist_flow {
+ uint8_t reset;
+ struct rtp_pkt *buffer;
+ uint32_t qsize;
+ uint32_t last_out;
+ uint32_t ssrc;
+ char cname[MAX_CNAME];
+ struct sockaddr_storage peer_sockaddr;
+ socklen_t peer_socklen;
+ uint16_t ri, wi;
+ int fd_in;
+ int fd_out;
+ int fd_rtcp;
+ int fd_rtcp_m;
+ int fd_nack;
+ uint8_t nacks_retries[RIST_QUEUE_SIZE];
+ uint32_t hi_timestamp;
+ uint64_t feedback_time;
+ uint32_t latency;
+ uint32_t rtp_latency;
+ uint32_t retry_interval;
+ uint32_t reorder_buffer;
+ uint8_t max_retries;
+ uint32_t packets_count;
+ uint32_t bytes_count;
+};
+
+static inline uint16_t rtcp_fb_nack_get_range_start(const uint8_t *p_rtcp_fb_nack)
+{
+ return (p_rtcp_fb_nack[0] << 8) | p_rtcp_fb_nack[1];
+}
+
+static inline uint16_t rtcp_fb_nack_get_range_extra(const uint8_t *p_rtcp_fb_nack)
+{
+ return (p_rtcp_fb_nack[2] << 8) | p_rtcp_fb_nack[3];
+}
+
+static inline void rtcp_fb_nack_set_range_start(uint8_t *p_rtcp_fb_nack,
+ uint16_t start)
+{
+ p_rtcp_fb_nack[0] = (start >> 8) & 0xff;
+ p_rtcp_fb_nack[1] = start & 0xff;
+}
+
+static inline void rtcp_fb_nack_set_range_extra(uint8_t *p_rtcp_fb_nack,
+ uint16_t extra)
+{
+ p_rtcp_fb_nack[2] = (extra >> 8) & 0xff;
+ p_rtcp_fb_nack[3] = extra & 0xff;
+}
+
+static inline void populate_cname(int fd, char *identifier)
+{
+ /* Set the CNAME Identifier as host at ip:port and fallback to hostname if needed */
+ char hostname[MAX_CNAME];
+ struct sockaddr_storage peer_sockaddr;
+ int name_length = 0;
+ socklen_t peer_socklen = 0;
+ int ret_hostname = gethostname(hostname, MAX_CNAME);
+ if (ret_hostname == -1)
+ snprintf(hostname, MAX_CNAME, "UnknownHost");
+ int ret_sockname = getsockname(fd, (struct sockaddr *)&peer_sockaddr, &peer_socklen);
+ if (ret_sockname == 0)
+ {
+ struct sockaddr *peer = (struct sockaddr *)&peer_sockaddr;
+ if (peer->sa_family == AF_INET) {
+ struct sockaddr_in *xin = (void*)peer;
+ name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
+ inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
+ if (name_length >= MAX_CNAME)
+ identifier[MAX_CNAME-1] = 0;
+ } else if (peer->sa_family == AF_INET6) {
+ struct sockaddr_in6 *xin6 = (void*)peer;
+ char str[INET6_ADDRSTRLEN];
+ inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
+ name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
+ str, ntohs(xin6->sin6_port));
+ if (name_length >= MAX_CNAME)
+ identifier[MAX_CNAME-1] = 0;
+ }
+ }
+ if (name_length == 0)
+ {
+ name_length = snprintf(identifier, MAX_CNAME, "%s", hostname);
+ if (name_length >= MAX_CNAME)
+ identifier[MAX_CNAME-1] = 0;
+ }
+}
+
+static inline uint32_t rtp_get_ts( int64_t i_pts )
+{
+ unsigned i_clock_rate = 90000;
+ /* This is an overflow-proof way of doing:
+ * return i_pts * (int64_t)i_clock_rate / CLOCK_FREQ;
+ *
+ * NOTE: this plays nice with offsets because the (equivalent)
+ * calculations are linear. */
+ lldiv_t q = lldiv(i_pts, CLOCK_FREQ);
+ return q.quot * (int64_t)i_clock_rate
+ + q.rem * (int64_t)i_clock_rate / CLOCK_FREQ;
+}
+
+static inline int64_t ts_get_from_rtp( uint32_t i_rtp_ts )
+{
+ unsigned i_clock_rate = 90000;
+ return (int64_t)i_rtp_ts * (int64_t)(CLOCK_FREQ/i_clock_rate);
+}
+
+static inline ssize_t rist_ReadFrom_i11e(int fd, void *buf, size_t len, struct sockaddr *peer,
+ socklen_t *slen)
+{
+ ssize_t ret = -1;
+
+ if (peer == NULL)
+ ret = vlc_recv_i11e(fd, buf, len, 0);
+ else
+ ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
+
+ if (ret == -1)
+ {
+ switch (errno)
+ {
+ case EAGAIN:
+ case EINTR:
+ if (vlc_killed())
+ return ret;
+
+ /* retry one time */
+ if (peer == NULL)
+ ret = vlc_recv_i11e(fd, buf, len, 0);
+ else
+ ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
+ default:
+ break;
+ }
+ }
+ return ret;
+}
+
+static inline ssize_t rist_Read_i11e(int fd, void *buf, size_t len)
+{
+ return rist_ReadFrom_i11e(fd, buf, len, NULL, NULL);
+}
+
+static inline ssize_t rist_ReadFrom(int fd, void *buf, size_t len, struct sockaddr *peer,
+ socklen_t *slen)
+{
+ ssize_t ret = -1;
+
+ if (peer == NULL)
+ ret = recv(fd, buf, len, 0);
+ else
+ ret = recvfrom(fd, buf, len, 0, peer, slen);
+
+ if (ret == -1)
+ {
+ switch (errno)
+ {
+ case EAGAIN:
+ case EINTR:
+ /* retry one time */
+ if (peer == NULL)
+ ret = recv(fd, buf, len, 0);
+ else
+ ret = recvfrom(fd, buf, len, 0, peer, slen);
+ default:
+ break;
+ }
+ }
+ return ret;
+}
+
+static inline ssize_t rist_Read(int fd, void *buf, size_t len)
+{
+ return rist_ReadFrom(fd, buf, len, NULL, NULL);
+}
+
+static inline ssize_t rist_WriteTo_i11e(int fd, const void *buf, size_t len,
+ const struct sockaddr *peer, socklen_t slen)
+{
+#ifdef _WIN32
+ # define ENOBUFS WSAENOBUFS
+ # define EAGAIN WSAEWOULDBLOCK
+ # define EWOULDBLOCK WSAEWOULDBLOCK
+#endif
+ ssize_t r = -1;
+ if (slen == 0)
+ r = vlc_send_i11e( fd, buf, len, 0 );
+ else
+ r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
+ if( r == -1
+ && net_errno != EAGAIN && net_errno != EWOULDBLOCK
+ && net_errno != ENOBUFS && net_errno != ENOMEM )
+ {
+ int type;
+ if (!getsockopt( fd, SOL_SOCKET, SO_TYPE,
+ &type, &(socklen_t){ sizeof(type) }))
+ {
+ if( type == SOCK_DGRAM )
+ {
+ /* ICMP soft error: ignore and retry */
+ if (slen == 0)
+ r = vlc_send_i11e( fd, buf, len, 0 );
+ else
+ r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
+ }
+ }
+ }
+ return r;
+}
+
+static inline ssize_t rist_Write_i11e(int fd, const void *buf, size_t len)
+{
+ return rist_WriteTo_i11e(fd, buf, len, NULL, 0);
+}
+
+static inline ssize_t rist_WriteTo(int fd, const void *buf, size_t len, const struct sockaddr *peer,
+ socklen_t slen)
+{
+#ifdef _WIN32
+ # define ENOBUFS WSAENOBUFS
+ # define EAGAIN WSAEWOULDBLOCK
+ # define EWOULDBLOCK WSAEWOULDBLOCK
+#endif
+ ssize_t r = -1;
+ if (slen == 0)
+ r = send( fd, buf, len, 0 );
+ else
+ r = sendto( fd, buf, len, 0, peer, slen );
+ if( r == -1
+ && net_errno != EAGAIN && net_errno != EWOULDBLOCK
+ && net_errno != ENOBUFS && net_errno != ENOMEM )
+ {
+ int type;
+ if (!getsockopt( fd, SOL_SOCKET, SO_TYPE,
+ &type, &(socklen_t){ sizeof(type) }))
+ {
+ if( type == SOCK_DGRAM )
+ {
+ /* ICMP soft error: ignore and retry */
+ if (slen == 0)
+ r = send( fd, buf, len, 0 );
+ else
+ r = sendto( fd, buf, len, 0, peer, slen );
+ }
+ }
+ }
+ return r;
+}
+
+static inline ssize_t rist_Write(int fd, const void *buf, size_t len)
+{
+ return rist_WriteTo(fd, buf, len, NULL, 0);
+}
+
+static bool is_multicast_address(char *psz_dst_server)
+{
+ int ret;
+ int ismulticast = 0;
+
+ struct addrinfo hint = {
+ .ai_socktype = SOCK_DGRAM,
+ .ai_protocol = IPPROTO_UDP,
+ .ai_flags = AI_NUMERICSERV | AI_IDN | AI_PASSIVE,
+ }, *res;
+
+ ret = vlc_getaddrinfo(psz_dst_server, 0, &hint, &res);
+ if (ret) {
+ return 0;
+ } else if(res->ai_family == AF_INET) {
+ unsigned long addr = ntohl(inet_addr(psz_dst_server));
+ ismulticast = IN_MULTICAST(addr);
+ } else if (res->ai_family == AF_INET6) {
+ if (strlen(psz_dst_server) >= 5 && (strncmp("[ff00", psz_dst_server, 5) == 0 ||
+ strncmp("[FF00", psz_dst_server, 5) == 0))
+ ismulticast = 1;
+ }
+
+ freeaddrinfo(res);
+
+ return ismulticast;
+}
\ No newline at end of file
diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
index 26560f400c..498085315b 100644
--- a/modules/access_output/Makefile.am
+++ b/modules/access_output/Makefile.am
@@ -35,3 +35,11 @@ libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS) $(LIBPTHREAD)
libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
access_out_LTLIBRARIES += $(LTLIBaccess_output_srt)
EXTRA_LTLIBRARIES += libaccess_output_srt_plugin.la
+
+### RIST ###
+libaccess_output_rist_plugin_la_SOURCES = access_output/rist.c access/rist.h
+libaccess_output_rist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
+libaccess_output_rist_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
+if HAVE_BITSTREAM
+access_out_LTLIBRARIES += libaccess_output_rist_plugin.la
+endif
diff --git a/modules/access_output/rist.c b/modules/access_output/rist.c
new file mode 100644
index 0000000000..0553d206b8
--- /dev/null
+++ b/modules/access_output/rist.c
@@ -0,0 +1,876 @@
+/*****************************************************************************
+ * * rist.c: RIST (Reliable Internet Stream Transport) output module
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018-2020, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ * Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_interrupt.h>
+#include <vlc_fs.h>
+#include <vlc_plugin.h>
+#include <vlc_sout.h>
+#include <vlc_block.h>
+#include <vlc_network.h>
+#include <vlc_threads.h>
+#include <vlc_rand.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include <bitstream/ietf/rtcp_rr.h>
+#include <bitstream/ietf/rtcp_sr.h>
+#include <bitstream/ietf/rtcp_fb.h>
+#include <bitstream/ietf/rtcp_sdes.h>
+#include <bitstream/ietf/rtp.h>
+
+#include "../access/rist.h"
+
+/* Uncomment the following to introduce induced packet loss for TESTING purposes only */
+/*#define TEST_PACKET_LOSS*/
+
+/* The default target packet size */
+#define RIST_TARGET_PACKET_SIZE 1328
+/* The default caching delay for output data */
+#define DEFAULT_CACHING_DELAY 50
+/* The default buffer size in ms */
+#define DEFAULT_BUFFER_SIZE 0
+/* Calculate and print stats once per second */
+#define STATS_INTERVAL 1000 /*ms*/
+
+#define MPEG_II_TRANSPORT_STREAM (0x21)
+#define RIST_DEFAULT_PORT 1968
+
+#define SOUT_CFG_PREFIX "sout-rist-"
+
+static const char *const ppsz_sout_options[] = {
+ "packet-size",
+ "caching",
+ "buffer-size",
+ "ssrc",
+ "stream-name",
+ NULL
+};
+
+struct sout_access_out_sys_t
+{
+ struct rist_flow *flow;
+ uint16_t rtp_counter;
+ char receiver_name[MAX_CNAME];
+ uint64_t last_rtcp_tx;
+ vlc_thread_t ristthread;
+ vlc_thread_t senderthread;
+ size_t i_packet_size;
+ bool b_mtu_warning;
+ bool b_ismulticast;
+ vlc_mutex_t lock;
+ vlc_mutex_t fd_lock;
+ block_t *p_pktbuffer;
+ uint64_t i_ticks_caching;
+ uint32_t ssrc;
+ block_fifo_t *p_fifo;
+ /* stats variables */
+ uint64_t i_last_stat;
+ uint32_t i_retransmit_packets;
+ uint32_t i_total_packets;
+};
+
+static struct rist_flow *rist_init_tx()
+{
+ struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
+ if (!flow)
+ return NULL;
+
+ flow->reset = 1;
+ flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
+ if ( unlikely( flow->buffer == NULL ) )
+ {
+ free(flow);
+ return NULL;
+ }
+ flow->fd_out = -1;
+ flow->fd_rtcp = -1;
+ flow->fd_rtcp_m = -1;
+
+ return flow;
+}
+
+static struct rist_flow *rist_udp_transmitter(sout_access_out_t *p_access, char *psz_dst_server,
+ int i_dst_port, bool b_ismulticast)
+{
+ struct rist_flow *flow;
+ flow = rist_init_tx();
+ if (!flow)
+ return NULL;
+
+ flow->fd_out = net_ConnectDgram(p_access, psz_dst_server, i_dst_port, -1, IPPROTO_UDP );
+ if (flow->fd_out < 0)
+ {
+ msg_Err( p_access, "cannot open output socket" );
+ goto fail;
+ }
+
+ if (b_ismulticast) {
+ flow->fd_rtcp_m = net_OpenDgram(p_access, psz_dst_server, i_dst_port + 1,
+ NULL, 0, IPPROTO_UDP);
+ if (flow->fd_rtcp_m < 0)
+ {
+ msg_Err( p_access, "cannot open multicast nack socket" );
+ goto fail;
+ }
+ }
+
+ flow->fd_rtcp = net_ConnectDgram(p_access, psz_dst_server, i_dst_port + 1, -1, IPPROTO_UDP );
+ if (flow->fd_rtcp < 0)
+ {
+ msg_Err( p_access, "cannot open nack socket" );
+ goto fail;
+ }
+
+ char *psz_streamname = NULL;
+ psz_streamname = var_InheritString( p_access, SOUT_CFG_PREFIX "stream-name" );
+ if ( psz_streamname != NULL && psz_streamname[0] != '\0')
+ {
+ int name_length = snprintf(flow->cname, MAX_CNAME, "%s", psz_streamname);
+ if (name_length >= MAX_CNAME)
+ flow->cname[MAX_CNAME-1] = 0;
+ free( psz_streamname );
+ }
+ else
+ populate_cname(flow->fd_rtcp, flow->cname);
+
+ msg_Info(p_access, "our cname is %s", flow->cname);
+
+ return flow;
+
+fail:
+ if (flow->fd_out != -1)
+ vlc_close(flow->fd_out);
+ if (flow->fd_rtcp != -1)
+ vlc_close(flow->fd_rtcp);
+ if (flow->fd_rtcp_m != -1)
+ vlc_close(flow->fd_rtcp_m);
+ free(flow->buffer);
+ free(flow);
+ return NULL;
+}
+
+static void rist_retransmit(sout_access_out_t *p_access, struct rist_flow *flow, uint16_t seq)
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ struct rtp_pkt *pkt = &(flow->buffer[seq]);
+ if (pkt->buffer == NULL)
+ {
+ msg_Err(p_access, "RIST recovery: missing requested packet %d, buffer not yet full", seq);
+ return;
+ }
+
+ /* Mark SSID for retransmission (change the last bit of the ssrc to 1) */
+ pkt->buffer->p_buffer[11] |= (1 << 0);
+#ifdef TEST_PACKET_LOSS
+# warning COMPILED WITH SELF INFLICTED PACKET LOSS
+ if ((flow->packets_count % 14) == 0) {
+ return;
+ }
+#endif
+ uint32_t rtp_age = flow->hi_timestamp - pkt->rtp_ts;
+ uint64_t age = ts_get_from_rtp(rtp_age)/1000;
+ if (flow->rtp_latency > 0 && rtp_age > flow->rtp_latency)
+ {
+ msg_Err(p_access, " Not Sending Nack #%d, too old (age %"PRId64" ms), current seq is:" \
+ " [%d]. Perhaps you should increase the buffer-size ...", seq, age, flow->wi);
+ }
+ else
+ {
+ msg_Dbg(p_access, " Sending Nack #%d (age %"PRId64" ms), current seq is: [%d]",
+ seq, age, flow->wi);
+ p_sys->i_retransmit_packets++;
+ vlc_mutex_lock( &p_sys->fd_lock );
+ if (rist_Write(flow->fd_out, pkt->buffer->p_buffer, pkt->buffer->i_buffer)
+ != (ssize_t)pkt->buffer->i_buffer) {
+ msg_Err(p_access, "Error sending retransmitted packet after 2 tries ...");
+ }
+
+ vlc_mutex_unlock( &p_sys->fd_lock );
+ }
+}
+
+static void process_nack(sout_access_out_t *p_access, uint8_t ptype, uint16_t nrecords,
+ struct rist_flow *flow, uint8_t *pkt)
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ int i,j;
+
+ /*msg_Info(p_access, " Nack (BbRR), %d record(s), Window: [%d:%d-->%d]", nrecords,
+ flow->ri, flow->wi, flow->wi-flow->ri);*/
+
+ if (ptype == RTCP_PT_RTPFR)
+ {
+ uint8_t pi_ssrc[4];
+ rtcp_fb_get_ssrc_media_src(pkt, pi_ssrc);
+ if (memcmp(pi_ssrc, "RIST", 4) != 0)
+ {
+ msg_Info(p_access, " Ignoring Nack with name %s", pi_ssrc);
+ return; /* Ignore app-type not RIST */
+ }
+
+ for (i = 0; i < (nrecords-2); i++) {
+ uint16_t missing;
+ uint16_t additional;
+ uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
+ missing = rtcp_fb_nack_get_range_start(rtp_nack_record);
+ additional = rtcp_fb_nack_get_range_extra(rtp_nack_record);
+ /*msg_Info(p_access, " Nack (Range), %d, current seq is: [%d]", missing, flow->wi);*/
+ vlc_mutex_lock( &p_sys->lock );
+ rist_retransmit(p_access, flow, missing);
+ for (j = 0; j < additional; j++) {
+ rist_retransmit(p_access, flow, missing + j + 1);
+ }
+ vlc_mutex_unlock( &p_sys->lock );
+ }
+ }
+ else if (ptype == RTCP_PT_RTPFB)
+ {
+ for (i = 0; i < (nrecords-2); i++) {
+ uint16_t missing;
+ uint16_t bitmask;
+ uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
+ missing = rtcp_fb_nack_get_packet_id(rtp_nack_record);
+ bitmask = rtcp_fb_nack_get_bitmask_lost(rtp_nack_record);
+ /*msg_Info(p_access, " Nack (Bitmask), %d, current seq is: [%d]", missing, flow->wi);*/
+ vlc_mutex_lock( &p_sys->lock );
+ rist_retransmit(p_access, flow, missing);
+ for (j = 0; j < 16; j++) {
+ if ((bitmask & (1 << j)) == (1 << j)) {
+ rist_retransmit(p_access, flow, missing + j + 1);
+ }
+ }
+ vlc_mutex_unlock( &p_sys->lock );
+ }
+ }
+ else
+ {
+ msg_Err(p_access, " !!! Wrong feedback. Ptype is %02x!=%02x, FMT: %02x", ptype,
+ RTCP_PT_RTPFR, rtcp_fb_get_fmt(pkt));
+ }
+}
+
+static void rist_rtcp_recv(sout_access_out_t *p_access, struct rist_flow *flow, uint8_t *pkt_raw,
+ size_t len)
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ uint8_t *pkt = pkt_raw;
+ uint8_t ptype;
+ uint16_t processed_bytes = 0;
+ uint16_t records;
+
+ while (processed_bytes < len) {
+ pkt = pkt_raw + processed_bytes;
+ /* safety checks */
+ uint16_t bytes_left = len - processed_bytes + 1;
+ if ( bytes_left < 4 )
+ {
+ /* we must have at least 4 bytes */
+ msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
+ bytes_left);
+ return;
+ }
+ else if (!rtp_check_hdr(pkt))
+ {
+ /* check for a valid rtp header */
+ msg_Err(p_access, "Malformed feedback packet starting with %02x, ignoring.", pkt[0]);
+ return;
+ }
+
+ ptype = rtcp_get_pt(pkt);
+ records = rtcp_get_length(pkt);
+ uint16_t bytes = (uint16_t)(4 * (1 + records));
+ if (bytes > bytes_left)
+ {
+ /* check for a sane number of bytes */
+ msg_Err(p_access, "Malformed feedback packet, wrong len %d, expecting %u bytes in the" \
+ " packet, got a buffer of %u bytes. ptype = %d", rtcp_get_length(pkt), bytes,
+ bytes_left, ptype);
+ return;
+ }
+
+ switch(ptype) {
+ case RTCP_PT_RTPFR:
+ case RTCP_PT_RTPFB:
+ process_nack(p_access, ptype, records, flow, pkt);
+ break;
+
+ case RTCP_PT_RR:
+ /*
+ if (p_sys->b_ismulticast == false)
+ process_rr(f, pkt, len);
+ */
+ break;
+
+ case RTCP_PT_SDES:
+ {
+ if (p_sys->b_ismulticast == false)
+ {
+ int8_t name_length = rtcp_sdes_get_name_length(pkt);
+ if (name_length > bytes_left)
+ {
+ /* check for a sane number of bytes */
+ msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
+ "buffer of %u bytes.", name_length, bytes_left);
+ return;
+ }
+ if (memcmp(pkt + RTCP_SDES_SIZE, p_sys->receiver_name, name_length) != 0)
+ {
+ memcpy(p_sys->receiver_name, pkt + RTCP_SDES_SIZE, name_length);
+ msg_Info(p_access, "Receiver name: %s", p_sys->receiver_name);
+ }
+ }
+ }
+ break;
+
+ case RTCP_PT_SR:
+ break;
+
+ default:
+ msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
+ }
+ processed_bytes += bytes;
+ }
+}
+
+static void rist_rtcp_send(sout_access_out_t *p_access)
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ struct rist_flow *flow = p_sys->flow;
+ uint8_t rtcp_buf[RTCP_SR_SIZE + RTCP_SDES_SIZE + MAX_CNAME] = { };
+ struct timeval tv;
+ int r;
+ uint64_t fractions;
+ uint16_t namelen = strlen(flow->cname) + 1;
+ gettimeofday(&tv, NULL);
+
+ /* Populate SR for sender report */
+ uint8_t *p_sr = rtcp_buf;
+ rtp_set_hdr(p_sr);
+ rtcp_sr_set_pt(p_sr);
+ rtcp_sr_set_length(p_sr, 6);
+ rtcp_fb_set_int_ssrc_pkt_sender(p_sr, p_sys->ssrc);
+ rtcp_sr_set_ntp_time_msw(p_sr, tv.tv_sec + SEVENTY_YEARS_OFFSET);
+ fractions = (uint64_t)tv.tv_usec;
+ fractions <<= 32ULL;
+ fractions /= 1000000ULL;
+ rtcp_sr_set_ntp_time_lsw(p_sr, (uint32_t)fractions);
+ rtcp_sr_set_rtp_time(p_sr, rtp_get_ts(mdate()));
+ vlc_mutex_lock( &p_sys->lock );
+ rtcp_sr_set_packet_count(p_sr, flow->packets_count);
+ rtcp_sr_set_octet_count(p_sr, flow->bytes_count);
+ vlc_mutex_unlock( &p_sys->lock );
+
+ /* Populate SDES for sender description */
+ uint8_t *p_sdes = (rtcp_buf + RTCP_SR_SIZE);
+ /* we need to make sure it is a multiple of 4, pad if necessary */
+ if ((namelen - 2) & 0x3)
+ namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
+ rtp_set_hdr(p_sdes);
+ rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
+ rtcp_sdes_set_pt(p_sdes);
+ rtcp_set_length(p_sdes, (namelen >> 2) + 2);
+ rtcp_sdes_set_cname(p_sdes, 1);
+ rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
+ p_sdes += RTCP_SDES_SIZE;
+ strlcpy((char *)p_sdes, flow->cname, namelen);
+
+ /* Send the rtcp message */
+ r = send(flow->fd_rtcp, rtcp_buf, RTCP_SR_SIZE + RTCP_SDES_SIZE + namelen, 0);
+ (void)r;
+}
+
+static void *rist_thread(void *data)
+{
+ sout_access_out_t *p_access = data;
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ uint64_t now;
+ uint8_t pkt[RTP_PKT_SIZE];
+ struct pollfd pfd[2];
+ int ret;
+ ssize_t r;
+
+ int poll_sockets = 1;
+ pfd[0].fd = p_sys->flow->fd_rtcp;
+ pfd[0].events = POLLIN;
+ if (p_sys->b_ismulticast)
+ {
+ pfd[1].fd = p_sys->flow->fd_rtcp_m;
+ pfd[1].events = POLLIN;
+ poll_sockets++;
+ }
+
+ for (;;) {
+ ret = poll(pfd, poll_sockets, RTCP_INTERVAL >> 1);
+ int canc = vlc_savecancel();
+ if (ret > 0)
+ {
+ if (pfd[0].revents & POLLIN)
+ {
+ r = rist_Read(p_sys->flow->fd_rtcp, pkt, RTP_PKT_SIZE);
+ if (r == RTP_PKT_SIZE) {
+ msg_Err(p_access, "Rist RTCP messsage is too big (%zd bytes) and was probably " \
+ "cut, please keep it under %d bytes", r, RTP_PKT_SIZE);
+ }
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "socket %d error: %s\n", p_sys->flow->fd_rtcp,
+ gai_strerror(errno));
+ }
+ else {
+ rist_rtcp_recv(p_access, p_sys->flow, pkt, r);
+ }
+ }
+ if (p_sys->b_ismulticast && (pfd[1].revents & POLLIN))
+ {
+ r = rist_Read(p_sys->flow->fd_rtcp_m, pkt, RTP_PKT_SIZE);
+ if (r == RTP_PKT_SIZE) {
+ msg_Err(p_access, "Rist RTCP messsage is too big (%zd bytes) and was " \
+ "probably cut, please keep it under %d bytes", r, RTP_PKT_SIZE);
+ }
+ if (unlikely(r == -1)) {
+ msg_Err(p_access, "mcast socket %d error: %s\n", p_sys->flow->fd_rtcp_m,
+ gai_strerror(errno));
+ }
+ else {
+ rist_rtcp_recv(p_access, p_sys->flow, pkt, r);
+ }
+ }
+ }
+
+ /* And, in any case: */
+ now = mdate();
+ if ((now - p_sys->last_rtcp_tx) > RIST_TICK_FROM_MS(RTCP_INTERVAL))
+ {
+ rist_rtcp_send(p_access);
+ p_sys->last_rtcp_tx = now;
+ }
+ vlc_restorecancel (canc);
+ }
+
+ return NULL;
+}
+
+/****************************************************************************
+ * RTP send
+ ****************************************************************************/
+static void* ThreadSend( void *data )
+{
+ sout_access_out_t *p_access = data;
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ uint64_t i_caching = p_sys->i_ticks_caching;
+ struct rist_flow *flow = p_sys->flow;
+
+ for (;;)
+ {
+ ssize_t len = 0;
+ uint16_t seq = 0;
+ uint32_t pkt_ts = 0;
+ block_t *out = block_FifoGet( p_sys->p_fifo );
+
+ block_cleanup_push( out );
+ mwait (out->i_dts + (mtime_t)i_caching);
+ vlc_cleanup_pop();
+
+ len = out->i_buffer;
+ int canc = vlc_savecancel();
+
+ seq = rtp_get_seqnum(out->p_buffer);
+ pkt_ts = rtp_get_timestamp(out->p_buffer);
+
+ vlc_mutex_lock( &p_sys->fd_lock );
+#ifdef TEST_PACKET_LOSS
+# warning COMPILED WITH SELF INFLICTED PACKET LOSS
+ if ((seq % 14) == 0) {
+ /*msg_Err(p_access, "Dropped packet with seq number %d ...", seq);*/
+ }
+ else
+ {
+ if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
+ msg_Err(p_access, "Error sending data packet after 2 tries ...");
+ }
+ }
+#else
+ if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
+ msg_Err(p_access, "Error sending data packet after 2 tries ...");
+ }
+#endif
+ vlc_mutex_unlock( &p_sys->fd_lock );
+
+ /* Insert Into Queue */
+ vlc_mutex_lock( &p_sys->lock );
+ /* Always replace the existing one with the new one */
+ struct rtp_pkt *pkt;
+ pkt = &(flow->buffer[seq]);
+ if (pkt->buffer)
+ {
+ block_Release(pkt->buffer);
+ pkt->buffer = NULL;
+ }
+ pkt->rtp_ts = pkt_ts;
+ pkt->buffer = out;
+
+ if (flow->reset == 1)
+ {
+ msg_Info(p_access, "Traffic detected");
+ /* First packet in the queue */
+ flow->reset = 0;
+ }
+ flow->wi = seq;
+ flow->hi_timestamp = pkt_ts;
+ /* Stats for RTCP feedback packets */
+ flow->packets_count++;
+ flow->bytes_count += len;
+ flow->last_out = seq;
+ vlc_mutex_unlock( &p_sys->lock );
+
+ /* We print out the stats once per second */
+ uint64_t now = mdate();
+ uint64_t interval = (now - p_sys->i_last_stat);
+ if ( interval > RIST_TICK_FROM_MS(STATS_INTERVAL) )
+ {
+ if (p_sys->i_retransmit_packets > 0)
+ {
+ float quality = 100;
+ if (p_sys->i_total_packets > 0)
+ quality = (float)100 - (float)100*(float)(p_sys->i_retransmit_packets)
+ /(float)p_sys->i_total_packets;
+ msg_Info(p_access, "STATS: Total %u, Retransmitted %u, Link Quality %.2f%%",
+ p_sys->i_total_packets, p_sys->i_retransmit_packets, quality);
+ }
+ p_sys->i_last_stat = now;
+ p_sys->i_retransmit_packets = 0;
+ p_sys->i_total_packets = 0;
+ }
+ p_sys->i_total_packets++;
+
+ vlc_restorecancel (canc);
+ }
+ return NULL;
+}
+
+static void SendtoFIFO( sout_access_out_t *p_access, block_t *buffer )
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ uint16_t seq = p_sys->rtp_counter++;
+
+ /* Set fresh rtp header data */
+ uint8_t *bufhdr = buffer->p_buffer;
+ rtp_set_hdr(bufhdr);
+ rtp_set_type(bufhdr, MPEG_II_TRANSPORT_STREAM);
+ rtp_set_seqnum(bufhdr, seq);
+ rtp_set_int_ssrc(bufhdr, p_sys->ssrc);
+ uint32_t pkt_ts = rtp_get_ts(buffer->i_dts);
+ rtp_set_timestamp(bufhdr, pkt_ts);
+
+ block_t *pkt = block_Duplicate(buffer);
+ block_FifoPut( p_sys->p_fifo, pkt );
+}
+
+static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+ int i_len = 0;
+
+ while( p_buffer )
+ {
+ block_t *p_next;
+ int i_block_split = 0;
+
+ if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_packet_size )
+ {
+ msg_Warn( p_access, "Buffer data size (%zu) > configured packet size (%zu), you " \
+ "should probably increase the configured packet size", p_buffer->i_buffer,
+ p_sys->i_packet_size );
+ p_sys->b_mtu_warning = true;
+ }
+
+ /* Temp buffer is already too large, flush */
+ if( p_sys->p_pktbuffer->i_buffer + p_buffer->i_buffer > p_sys->i_packet_size )
+ {
+ SendtoFIFO(p_access, p_sys->p_pktbuffer);
+ p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+ }
+
+ i_len += p_buffer->i_buffer;
+
+ while( p_buffer->i_buffer )
+ {
+
+ size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_packet_size );
+
+ i_block_split++;
+
+ if( p_sys->p_pktbuffer->i_buffer == RTP_HEADER_SIZE )
+ {
+ p_sys->p_pktbuffer->i_dts = p_buffer->i_dts;
+ }
+
+ memcpy( p_sys->p_pktbuffer->p_buffer + p_sys->p_pktbuffer->i_buffer,
+ p_buffer->p_buffer, i_write );
+
+ p_sys->p_pktbuffer->i_buffer += i_write;
+ p_buffer->p_buffer += i_write;
+ p_buffer->i_buffer -= i_write;
+
+ /* Flush if we reached the target size for the case of block size < target packet size.
+ * Also flush when we are in block_split > 1 for the case when the block_size is
+ * larger than the packet-size because we need to continue the inner loop */
+ if( p_sys->p_pktbuffer->i_buffer == p_sys->i_packet_size || i_block_split > 1 )
+ {
+ SendtoFIFO(p_access, p_sys->p_pktbuffer);
+ p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+ }
+
+ }
+
+ p_next = p_buffer->p_next;
+ block_Release( p_buffer );
+ p_buffer = p_next;
+
+ }
+
+ if ( i_len <= 0 ) {
+ block_ChainRelease( p_buffer );
+ }
+ return i_len;
+}
+
+static int Control( sout_access_out_t *p_access, int i_query, va_list args )
+{
+ VLC_UNUSED( p_access );
+
+ int i_ret = VLC_SUCCESS;
+
+ switch( i_query )
+ {
+ case ACCESS_OUT_CONTROLS_PACE:
+ *va_arg( args, bool * ) = false;
+ break;
+
+ default:
+ i_ret = VLC_EGENERIC;
+ break;
+ }
+
+ return i_ret;
+}
+
+static void Clean( sout_access_out_t *p_access )
+{
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+
+ if( likely(p_sys->p_fifo != NULL) )
+ block_FifoRelease( p_sys->p_fifo );
+
+ if ( p_sys->flow )
+ {
+ if (p_sys->flow->fd_out >= 0) {
+ net_Close (p_sys->flow->fd_out);
+ }
+ if (p_sys->flow->fd_rtcp >= 0) {
+ net_Close (p_sys->flow->fd_rtcp);
+ }
+ if (p_sys->flow->fd_rtcp_m >= 0) {
+ net_Close (p_sys->flow->fd_rtcp_m);
+ }
+ for (int i=0; i<RIST_QUEUE_SIZE; i++) {
+ struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
+ if (pkt->buffer)
+ {
+ block_Release(pkt->buffer);
+ pkt->buffer = NULL;
+ }
+ }
+ free(p_sys->flow->buffer);
+ free(p_sys->flow);
+ }
+
+ vlc_mutex_destroy( &p_sys->lock );
+ vlc_mutex_destroy( &p_sys->fd_lock );
+
+ if (p_sys->p_pktbuffer)
+ block_Release(p_sys->p_pktbuffer);
+}
+
+static void Close( vlc_object_t * p_this )
+{
+ sout_access_out_t *p_access = (sout_access_out_t*)p_this;
+ sout_access_out_sys_t *p_sys = p_access->p_sys;
+
+ vlc_cancel(p_sys->ristthread);
+ vlc_cancel(p_sys->senderthread);
+
+ vlc_join(p_sys->ristthread, NULL);
+ vlc_join(p_sys->senderthread, NULL);
+
+ Clean( p_access );
+}
+
+static int Open( vlc_object_t *p_this )
+{
+ sout_access_out_t *p_access = (sout_access_out_t*)p_this;
+ sout_access_out_sys_t *p_sys = NULL;
+
+ if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
+ || var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
+ || var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
+ || var_Create ( p_access, "src-addr", VLC_VAR_STRING ) )
+ {
+ msg_Err( p_access, "Valid network information is required." );
+ return VLC_ENOMEM;
+ }
+
+ config_ChainParse( p_access, SOUT_CFG_PREFIX, ppsz_sout_options, p_access->p_cfg );
+
+ p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+ if( unlikely( p_sys == NULL ) )
+ return VLC_ENOMEM;
+
+ int i_dst_port = RIST_DEFAULT_PORT;
+ char *psz_dst_addr;
+ char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
+ if( !psz_dst_addr )
+ return VLC_ENOMEM;
+
+ if ( psz_parser[0] == '[' )
+ psz_parser = strchr( psz_parser, ']' );
+
+ psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
+ if ( psz_parser != NULL )
+ {
+ *psz_parser++ = '\0';
+ i_dst_port = atoi( psz_parser );
+ }
+
+ vlc_mutex_init( &p_sys->lock );
+ vlc_mutex_init( &p_sys->fd_lock );
+
+ msg_Info(p_access, "Connecting RIST output to %s:%d and %s:%d", psz_dst_addr, i_dst_port,
+ psz_dst_addr, i_dst_port+1);
+ p_sys->b_ismulticast = is_multicast_address(psz_dst_addr);
+ struct rist_flow *flow = rist_udp_transmitter(p_access, psz_dst_addr, i_dst_port,
+ p_sys->b_ismulticast);
+ free (psz_dst_addr);
+ if (!flow)
+ goto failed;
+
+ p_sys->flow = flow;
+ flow->latency = var_InheritInteger(p_access, SOUT_CFG_PREFIX "buffer-size");
+ flow->rtp_latency = rtp_get_ts(RIST_TICK_FROM_MS(flow->latency));
+ p_sys->ssrc = var_InheritInteger(p_access, SOUT_CFG_PREFIX "ssrc");
+ if (p_sys->ssrc == 0) {
+ vlc_rand_bytes(&p_sys->ssrc, 4);
+ }
+ /* Last bit of ssrc must be 0 for normal data and 1 for retries */
+ p_sys->ssrc &= ~(1 << 0);
+
+ msg_Info(p_access, "SSRC: 0x%08X", p_sys->ssrc);
+ p_sys->i_ticks_caching = RIST_TICK_FROM_MS(var_InheritInteger( p_access,
+ SOUT_CFG_PREFIX "caching"));
+ p_sys->i_packet_size = var_InheritInteger(p_access, SOUT_CFG_PREFIX "packet-size" );
+ p_sys->p_fifo = block_FifoNew();
+ if( unlikely(p_sys->p_fifo == NULL) )
+ goto failed;
+ p_sys->p_pktbuffer = block_Alloc( p_sys->i_packet_size );
+ if( unlikely(p_sys->p_pktbuffer == NULL) )
+ goto failed;
+
+ p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+
+ p_access->p_sys = p_sys;
+
+ if( vlc_clone(&p_sys->senderthread, ThreadSend, p_access, VLC_THREAD_PRIORITY_HIGHEST ) )
+ {
+ msg_Err(p_access, "Failed to create sender thread.");
+ goto failed;
+ }
+
+ if (vlc_clone(&p_sys->ristthread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
+ {
+ msg_Err(p_access, "Failed to create worker thread.");
+ vlc_cancel(p_sys->senderthread);
+ vlc_join(p_sys->senderthread, NULL);
+ goto failed;
+ }
+
+ p_access->pf_write = Write;
+ p_access->pf_control = Control;
+
+ return VLC_SUCCESS;
+
+failed:
+ Clean( p_access );
+ return VLC_EGENERIC;
+}
+
+#define CACHING_TEXT N_("RIST data output caching size (ms)")
+#define CACHING_LONGTEXT N_( \
+ "Having this cache will guarantee that the packets going out are " \
+ "delivered at a spacing determined by the chain timestamps thus ensuring " \
+ "a near jitter free output. Be aware that this setting will also add to " \
+ "the overall latency of the stream." )
+
+#define BUFFER_TEXT N_("RIST retry-buffer queue size (ms)")
+#define BUFFER_LONGTEXT N_( \
+ "This must match the buffer size (latency) configured on the server side. If you " \
+ "are not sure, leave the default of 0 which will set it the maximum " \
+ "value and will use about 100MB of RAM" )
+
+#define SSRC_TEXT N_("SSRC used in RTP output (default is random, i.e. 0)")
+#define SSRC_LONGTEXT N_( \
+ "Use this setting to specify a known SSRC for the RTP header. This is only useful " \
+ "if your receiver acts on it. When using VLC as receiver, it is not." )
+
+#define NAME_TEXT N_("Stream name")
+#define NAME_LONGTEXT N_( \
+ "This Stream name will be sent to the receiver using the rist RTCP channel" )
+
+/* Module descriptor */
+vlc_module_begin()
+
+ set_shortname( N_("RIST") )
+ set_description( N_("RIST stream output") )
+ set_category( CAT_SOUT )
+ set_subcategory( SUBCAT_SOUT_ACO )
+
+ add_integer( SOUT_CFG_PREFIX "packet-size", RIST_TARGET_PACKET_SIZE,
+ N_("RIST target packet size (bytes)"), NULL, true )
+ add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_CACHING_DELAY,
+ CACHING_TEXT, CACHING_LONGTEXT, true )
+ add_integer( SOUT_CFG_PREFIX "buffer-size", DEFAULT_BUFFER_SIZE,
+ BUFFER_TEXT, BUFFER_LONGTEXT, true )
+ add_integer( SOUT_CFG_PREFIX "ssrc", 0,
+ SSRC_TEXT, SSRC_LONGTEXT, true )
+ add_string( SOUT_CFG_PREFIX "stream-name", NULL, NAME_TEXT, NAME_LONGTEXT, true )
+
+ set_capability( "sout access", 0 )
+ add_shortcut( "rist", "tr06" )
+
+ set_callbacks( Open, Close )
+
+vlc_module_end ()
--
2.28.0
More information about the vlc-devel
mailing list