[vlc-commits] access: add RIST module

Sergio Ammirata git at videolan.org
Mon Nov 5 18:30:04 CET 2018


vlc | branch: master | Sergio Ammirata <sergio at ammirata.net> | Mon Nov  5 09:39:16 2018 -0500| [2cb6e8459739b558acdf4382b8c510284276bd32] | committer: Thomas Guillem

access: add RIST module

RIST for the Reliable Internet Stream Transport Protocol

The implementation follows the Video Services Forum (VSF) Technical
Recommendation TR-06 which defines an ARQ based UDP transmission protocol
for real-time streaming over lossy networks (internet, wifi, etc).

Signed-off-by: Thomas Guillem <thomas at gllm.fr>

> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=2cb6e8459739b558acdf4382b8c510284276bd32
---

 NEWS                       |    1 +
 modules/MODULES_LIST       |    1 +
 modules/access/Makefile.am |    8 +
 modules/access/rist.c      | 1088 ++++++++++++++++++++++++++++++++++++++++++++
 modules/access/rist.h      |  335 ++++++++++++++
 po/POTFILES.in             |    2 +
 6 files changed, 1435 insertions(+)

diff --git a/NEWS b/NEWS
index 3054bb84a7..6115713dcd 100644
--- a/NEWS
+++ b/NEWS
@@ -29,6 +29,7 @@ Codecs:
 
 Access:
  * Enable SMB2 / SMB3 support on mobile ports with libsmb2
+ * Added support for the RIST (Reliable Internet Stream Transport) Protocol
 
 Video output:
  * Remove aa plugin
diff --git a/modules/MODULES_LIST b/modules/MODULES_LIST
index 79161ae3ed..a0f4d54f64 100644
--- a/modules/MODULES_LIST
+++ b/modules/MODULES_LIST
@@ -17,6 +17,7 @@ $Id$
  * access_output_srt: SRT (Secure Reliable Transport) access_output module
  * access_output_udp: UDP Network access_output module
  * access_qtsound: Quicktime Audio Capture
+ * access_rist: RIST (Reliable Internet Stream Transport) access module
  * access_srt: SRT(Secure Reliable Transport) access module
  * access_wasapi: WASAPI audio input
  * accesstweaks: access control tweaking module (dev tool)
diff --git a/modules/access/Makefile.am b/modules/access/Makefile.am
index 5abe3a1a5f..31c872b915 100644
--- a/modules/access/Makefile.am
+++ b/modules/access/Makefile.am
@@ -428,3 +428,11 @@ libaccess_srt_plugin_la_LIBADD = $(SRT_LIBS)
 libaccess_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(accessdir)'
 access_LTLIBRARIES += $(LTLIBaccess_srt)
 EXTRA_LTLIBRARIES += libaccess_srt_plugin.la
+
+### RIST ###
+
+librist_plugin_la_SOURCES = access/rist.c access/rist.h
+librist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
+if HAVE_BITSTREAM
+access_LTLIBRARIES += librist_plugin.la
+endif
diff --git a/modules/access/rist.c b/modules/access/rist.c
new file mode 100644
index 0000000000..94103f7293
--- /dev/null
+++ b/modules/access/rist.c
@@ -0,0 +1,1088 @@
+/*****************************************************************************
+ * rist.c: RIST (Reliable Internet Stream Transport) input module
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ *          Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_interrupt.h>
+#include <vlc_plugin.h>
+#include <vlc_access.h>
+#include <vlc_threads.h>
+#include <vlc_network.h>
+#include <vlc_block.h>
+#include <vlc_url.h>
+#ifdef HAVE_POLL
+#include <poll.h>
+#endif
+#include <bitstream/ietf/rtcp_rr.h>
+#include <bitstream/ietf/rtcp_sdes.h>
+#include <bitstream/ietf/rtcp_fb.h>
+#include <bitstream/ietf/rtp.h>
+
+#include "rist.h"
+
+/* The default latency is 1000 ms */
+#define RIST_DEFAULT_LATENCY 1000
+/* The default nack retry interval */
+#define RIST_DEFAULT_RETRY_INTERVAL 132
+/* The default packet re-ordering buffer */
+#define RIST_DEFAULT_REORDER_BUFFER 70
+/* The default max packet size */
+#define RIST_MAX_PACKET_SIZE 1472
+/* The default timeout is 5 ms */
+#define RIST_DEFAULT_POLL_TIMEOUT 5
+/* The max retry count for nacks */
+#define RIST_MAX_RETRIES 10
+/* The rate at which we process and send nack requests */
+#define NACK_INTERVAL 5 /*ms*/
+/* Calculate and print stats once per second */
+#define STATS_INTERVAL 1000 /*ms*/
+
+static const int nack_type[] = {
+    0, 1,
+};
+
+static const char *const nack_type_names[] = {
+    N_("Range"), N_("Bitmask"),
+};
+
+enum NACK_TYPE {
+    NACK_FMT_RANGE = 0,
+    NACK_FMT_BITMASK
+};
+
+typedef struct
+{
+    struct rist_flow *flow;
+    char             sender_name[MAX_CNAME];
+    enum NACK_TYPE   nack_type;
+    uint64_t         last_data_rx;
+    uint64_t         last_nack_tx;
+    vlc_thread_t     thread;
+    int              i_max_packet_size;
+    int              i_poll_timeout;
+    int              i_poll_timeout_current;
+    bool             eof_on_reset;
+    block_fifo_t     *p_fifo;
+    vlc_mutex_t      lock;
+    uint64_t         last_message;
+    uint64_t         last_reset;
+    /* stat variables */
+    uint32_t         i_poll_timeout_zero_count;
+    uint32_t         i_poll_timeout_nonzero_count;
+    uint64_t         i_last_stat;
+    float            vbr_ratio;
+    uint16_t         vbr_ratio_count;
+    uint32_t         i_lost_packets;
+    uint32_t         i_recovered_packets;
+    uint32_t         i_reordered_packets;
+    uint32_t         i_total_packets;
+} stream_sys_t;
+
+static int Control(stream_t *p_access, int i_query, va_list args)
+{
+    switch( i_query )
+    {
+        case STREAM_CAN_SEEK:
+        case STREAM_CAN_FASTSEEK:
+        case STREAM_CAN_PAUSE:
+        case STREAM_CAN_CONTROL_PACE:
+            *va_arg( args, bool * ) = false;
+            break;
+
+        case STREAM_GET_PTS_DELAY:
+            *va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
+                   var_InheritInteger(p_access, "network-caching") );
+            break;
+
+        default:
+            return VLC_EGENERIC;
+    }
+
+    return VLC_SUCCESS;
+}
+
+static struct rist_flow *rist_init_rx(void)
+{
+    struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
+    if (!flow)
+        return NULL;
+
+    flow->reset = 2;
+    flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
+
+    if ( unlikely( flow->buffer == NULL ) )
+    {
+        free(flow);
+        return NULL;
+    }
+    return flow;
+}
+
+static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len, 
+    const struct sockaddr *peer, socklen_t slen)
+{
+    vlc_mutex_lock( &lock );
+    rist_WriteTo_i11e(fd, buf, len, peer, slen);
+    vlc_mutex_unlock( &lock );
+}
+
+static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
+             parsed_url->psz_host, parsed_url->i_port,
+             parsed_url->psz_host, parsed_url->i_port+1);
+
+    p_sys->flow = rist_init_rx();
+    if (!p_sys->flow)
+        return NULL;
+
+    p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
+                0, IPPROTO_UDP);
+    if (p_sys->flow->fd_in < 0)
+    {
+        msg_Err( p_access, "cannot open input socket" );
+        return NULL;
+    }
+
+    p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1, 
+        NULL, 0, IPPROTO_UDP);
+    if (p_sys->flow->fd_nack < 0)
+    {
+        msg_Err( p_access, "cannot open nack socket" );
+        return NULL;
+    }
+
+    populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
+    msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
+
+    return  p_sys->flow;
+}
+
+static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
+{
+    if (flow->ri <= flow->wi) {
+        return ((idx > flow->ri) && (idx <= flow->wi));
+    } else {
+        return ((idx > flow->ri) || (idx <= flow->wi));
+    }
+}
+
+static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    int namelen = strlen(flow->cname) + 1;
+
+    /* we need to make sure it is a multiple of 4, pad if necessary */
+    if ((namelen - 2) & 0x3)
+        namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
+
+    int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
+    uint8_t *buf = malloc(rtcp_feedback_size);
+    if ( unlikely( buf == NULL ) )
+        return;
+
+    /* Populate RR */
+    uint8_t *rr = buf;
+    rtp_set_hdr(rr);
+    rtcp_rr_set_pt(rr);
+    rtcp_set_length(rr, 1);
+    rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
+
+    /* Populate SDES */
+    uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
+    rtp_set_hdr(p_sdes);
+    rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
+    rtcp_sdes_set_pt(p_sdes);
+    rtcp_set_length(p_sdes, (namelen >> 2) + 2);
+    rtcp_sdes_set_cname(p_sdes, 1);
+    rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
+    uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
+    strlcpy((char *)p_sdes_name, flow->cname, namelen);
+
+    /* Write to Socket */
+    rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size, 
+        (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+    free(buf);
+    buf = NULL;
+}
+
+static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    struct rist_flow *flow = p_sys->flow;
+    int len = 0;
+
+    int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
+        RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+    uint8_t *buf = malloc(bbnack_bufsize);
+    if ( unlikely( buf == NULL ) )
+        return;
+
+    /* Populate NACKS */
+    uint8_t *nack = buf;
+    rtp_set_hdr(nack);
+    rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
+    rtcp_set_pt(nack, RTCP_PT_RTPFB);
+    rtcp_set_length(nack, 2 + nack_count);
+    /*uint8_t name[4] = "RIST";*/
+    /*rtcp_fb_set_ssrc_media_src(nack, name);*/
+    len += RTCP_FB_HEADER_SIZE;
+    /* TODO : group together */
+    uint16_t nacks[MAX_NACKS];
+    memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
+    for (int i = 0; i < nack_count; i++) {
+        uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
+        rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
+        rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
+    }
+    len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+
+    /* Write to Socket */
+    rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, 
+        (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+    free(buf);
+    buf = NULL;
+}
+
+static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    struct rist_flow *flow = p_sys->flow;
+    int len = 0;
+
+    int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
+        RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+    uint8_t *buf = malloc(rbnack_bufsize);
+    if ( unlikely( buf == NULL ) )
+        return;
+
+    /* Populate NACKS */
+    uint8_t *nack = buf;
+    rtp_set_hdr(nack);
+    rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
+    rtcp_set_pt(nack, RTCP_PT_RTPFR);
+    rtcp_set_length(nack, 2 + nack_count);
+    uint8_t name[4] = "RIST";
+    rtcp_fb_set_ssrc_media_src(nack, name);
+    len += RTCP_FB_HEADER_SIZE;
+    /* TODO : group together */
+    uint16_t nacks[MAX_NACKS];
+    memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
+    for (int i = 0; i < nack_count; i++)
+    {
+        uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
+        rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
+        rtcp_fb_nack_set_range_extra(nack_record, 0);
+    }
+    len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
+
+    /* Write to Socket */
+    rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, 
+        (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+    free(buf);
+    buf = NULL;
+}
+
+static void send_nacks(stream_t *p_access, struct rist_flow *flow)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    struct rtp_pkt *pkt;
+    uint16_t idx;
+    uint64_t last_ts = 0;
+    uint16_t null_count = 0;
+    int nacks_len = 0;
+    uint16_t nacks[MAX_NACKS];
+
+    idx = flow->ri;
+    while(idx++ != flow->wi)
+    {
+        pkt = &(flow->buffer[idx]);
+        if (pkt->buffer == NULL)
+        {
+            if (nacks_len + 1 >= MAX_NACKS)
+            {
+                break;
+            }
+            else
+            {
+                null_count++;
+                /* TODO: after adding average spacing calculation, change this formula
+                   to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
+                uint64_t extrapolated_ts = last_ts;
+                /* Find out the age and add it only if necessary */
+                int retry_count = flow->nacks_retries[idx];
+                uint64_t age = flow->hi_timestamp - extrapolated_ts;
+                uint64_t expiration;
+                if (retry_count == 0){
+                    expiration = flow->reorder_buffer;
+                } else {
+                    expiration = flow->nacks_retries[idx] * flow->retry_interval;
+                }
+                if (age > expiration && retry_count <= flow->max_retries)
+                {
+                    flow->nacks_retries[idx]++;
+                    nacks[nacks_len++] = idx;
+                    msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
+                        "expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000, 
+                        flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
+                }
+            }
+        }
+        else
+        {
+            last_ts = pkt->rtp_ts;
+            null_count = 0;
+        }
+    }
+    if (nacks_len > 0)
+    {
+        block_t *pkt_nacks = block_Alloc(nacks_len * 2);
+        if (pkt_nacks)
+        {
+            memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
+            pkt_nacks->i_buffer = nacks_len * 2;
+            block_FifoPut( p_sys->p_fifo, pkt_nacks );
+        }
+    }
+}
+
+static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
+{
+#define CMP(a, b) if (a != b) return a < b ? -1 : 1
+
+    CMP(x->sa_family, y->sa_family);
+
+    if (x->sa_family == AF_INET)
+    {
+        struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
+        CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
+        CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
+    }
+    else if (x->sa_family == AF_INET6)
+    {
+        struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
+        int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr, 
+            sizeof(xin6->sin6_addr.s6_addr));
+        if (r != 0)
+            return r;
+        CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
+        CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
+        CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
+    }
+
+#undef CMP
+    return 0;
+}
+
+static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
+{
+    if (x->sa_family == AF_INET)
+    {
+        struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
+        msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d", 
+            inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr), 
+            ntohs(yin->sin_port));
+    }
+    else if (x->sa_family == AF_INET6)
+    {
+        struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
+        char oldstr[INET6_ADDRSTRLEN];
+        char newstr[INET6_ADDRSTRLEN];
+        inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
+        inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
+        msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d", 
+            oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
+    }
+}
+
+static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
+{
+    if (x->sa_family == AF_INET)
+    {
+        struct sockaddr_in *xin = (void*)x;
+        msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
+    }
+    else if (x->sa_family == AF_INET6)
+    {
+        struct sockaddr_in6 *xin6 = (void*)x;
+        char str[INET6_ADDRSTRLEN];
+        inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
+        msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
+    }
+}
+
+static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len, 
+    struct sockaddr *peer, socklen_t slen)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    uint8_t  ptype;
+    uint16_t processed_bytes = 0;
+    uint16_t records;
+    char new_sender_name[MAX_CNAME];
+    uint8_t *buf;
+
+    while (processed_bytes < len) {
+        buf = buf_in + processed_bytes;
+        /* safety checks */
+        uint16_t bytes_left = len - processed_bytes + 1;
+        if ( bytes_left < 4 )
+        {
+            /* we must have at least 4 bytes */
+            msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d", 
+                bytes_left);
+            return;
+        }
+        else if (!rtp_check_hdr(buf))
+        {
+            /* check for a valid rtp header */
+            msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
+            return;
+        }
+
+        ptype =  rtcp_get_pt(buf);
+        records = rtcp_get_length(buf);
+        uint16_t bytes = (uint16_t)(4 * (1 + records));
+        if (bytes > bytes_left)
+        {
+            /* check for a sane number of bytes */
+            msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
+                "packet, got a buffer of %u bytes.", rtcp_get_length(buf), bytes, bytes_left);
+            return;
+        }
+
+        switch(ptype) {
+            case RTCP_PT_RTPFR:
+            case RTCP_PT_RTPFB:
+                break;
+
+            case RTCP_PT_RR:
+                break;
+
+            case RTCP_PT_SDES:
+                {
+                    /* Check for changes in source IP address or port */
+                    int8_t name_length = rtcp_sdes_get_name_length(buf);
+                    if (name_length > bytes_left)
+                    {
+                        /* check for a sane number of bytes */
+                        msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
+                            "buffer of %u bytes.", name_length, bytes_left);
+                        return;
+                    }
+                    bool ip_port_changed = false;
+                    if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
+                    {
+                        ip_port_changed = true;
+                        if(flow->peer_socklen > 0)
+                            print_sockaddr_info_change(p_access, 
+                                (struct sockaddr *)&flow->peer_sockaddr, peer);
+                        else
+                            print_sockaddr_info(p_access, peer);
+                        vlc_mutex_lock( &p_sys->lock );
+                        memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
+                        flow->peer_socklen = slen;
+                        vlc_mutex_unlock( &p_sys->lock );
+                    }
+
+                    /* Check for changes in cname */
+                    bool peer_name_changed = false;
+                    memset(new_sender_name, 0, MAX_CNAME);
+                    memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
+                    if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
+                    {
+                        peer_name_changed = true;
+                        if (strcmp(p_sys->sender_name, "") == 0)
+                            msg_Info(p_access, "Peer Name: %s", new_sender_name);
+                        else
+                            msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
+                                "Name: %s", p_sys->sender_name, new_sender_name);
+                        memset(p_sys->sender_name, 0, MAX_CNAME);
+                        memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
+                    }
+
+                    /* Reset the buffer as the source must have been restarted */
+                    if (peer_name_changed || ip_port_changed)
+                    {
+                        /* reset the buffer */
+                        flow->reset = 1;
+                    }
+                }
+                break;
+
+            case RTCP_PT_SR:
+                break;
+
+            default:
+                msg_Err(p_access, "   Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
+        }
+        processed_bytes += (4 * (1 + records));
+    }
+}
+
+static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+
+    /* safety checks */
+    if ( len < RTP_HEADER_SIZE )
+    {
+        /* check if packet size >= rtp header size */
+        msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %lu", len);
+        return false;
+    }
+    else if (!rtp_check_hdr(buf))
+    {
+        /* check for a valid rtp header */
+        msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
+        return false;
+    }
+
+    uint16_t idx = rtp_get_seqnum(buf);
+    uint32_t pkt_ts = rtp_get_timestamp(buf);
+    bool retrasnmitted = false;
+    bool success = true;
+    uint64_t now = vlc_tick_now();
+
+    if (flow->reset == 2)
+    {
+        if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) {
+            msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
+        }
+        p_sys->last_message = now;
+        return success;
+    }
+    else if (flow->reset == 1)
+    {
+        msg_Info(p_access, "Traffic detected after buffer reset");
+        /* First packet in the queue */
+        flow->hi_timestamp = pkt_ts;
+        msg_Info(p_access, "ts@%u", flow->hi_timestamp);
+        flow->wi = idx;
+        flow->ri = idx;
+        flow->reset = 0;
+    }
+
+    /* Check to see if this is a retransmission or a regular packet */
+    if (buf[11] & (1 << 0))
+    {
+        msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi, 
+            flow->wi-flow->ri);
+        p_sys->i_recovered_packets++;
+        retrasnmitted = true;
+    }
+    else if (flow->wi != flow->ri)
+    {
+        /* Reset counter to 0 on incoming holes */
+        /* Regular packets only as retransmits are expected to come in out of order */
+        uint16_t idxnext = (uint16_t)(flow->wi + 1);
+        if (idx != idxnext)
+        {
+            if (idx > idxnext) {
+                msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]", 
+                    idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
+            } else {
+                p_sys->i_reordered_packets++;
+                msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx, 
+                    idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
+            }
+            uint16_t zero_counter = (uint16_t)(flow->wi + 1);
+            while(zero_counter++ != idx) {
+                flow->nacks_retries[zero_counter] = 0;
+            }
+            /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d", 
+                idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
+        }
+    }
+
+    /* Always replace the existing one with the new one */
+    struct rtp_pkt *pkt;
+    pkt = &(flow->buffer[idx]);
+    if (pkt->buffer && pkt->buffer->i_buffer > 0)
+    {
+        block_Release(pkt->buffer);
+        pkt->buffer = NULL;
+    }
+    pkt->buffer = block_Alloc(len);
+    if (!pkt->buffer)
+        return false;
+
+    pkt->buffer->i_buffer = len;
+    memcpy(pkt->buffer->p_buffer, buf, len);
+    pkt->rtp_ts = pkt_ts;
+    p_sys->last_data_rx = vlc_tick_now();
+    /* Reset the try counter regardless of wether it was a retransmit or not */
+    flow->nacks_retries[idx] = 0;
+
+    if (retrasnmitted)
+        return success;
+
+    p_sys->i_total_packets++;
+    /* Perform discontinuity checks and udpdate counters */
+    if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
+    {
+        if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
+        {
+            msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx, 
+                flow->wi, pkt_ts, flow->hi_timestamp);
+            flow->reset = 1;
+            success = false;
+        }
+        else
+        {
+            flow->wi = idx;
+            flow->hi_timestamp = pkt_ts;
+        }
+    }
+    else if (!is_index_in_range(flow, idx))
+    {
+        /* incoming timestamp just jumped back in time or index is outside of scope */
+        msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx, 
+            flow->wi, pkt_ts, flow->hi_timestamp);
+        flow->reset = 1;
+        success = false;
+    }
+
+    return success;
+}
+
+static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    block_t *pktout = NULL;
+    struct rtp_pkt *pkt;
+    uint16_t idx;
+    if (flow->ri == flow->wi || flow->reset > 0)
+        return NULL;
+
+    idx = flow->ri;
+    bool found_data = false;
+    uint16_t loss_amount = 0;
+    while(idx++ != flow->wi) {
+
+        pkt = &(flow->buffer[idx]);
+        if (!pkt->buffer)
+        {
+            /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
+            loss_amount++;
+            /* We move ahead until we find a timestamp but we do not move the cursor.
+             * None of them are guaranteed packet loss because we do not really
+             * know their timestamps. They might still arrive on the next loop.
+             * We can confirm the loss only if we get a valid packet in the loop below. */
+            continue;
+        }
+
+        /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx, 
+            flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
+        if (flow->hi_timestamp > (uint32_t)(pkt->rtp_ts + flow->rtp_latency))
+        {
+            /* Populate output packet now but remove rtp header from source */
+            int newSize = pkt->buffer->i_buffer - RTP_HEADER_SIZE;
+            pktout = block_Alloc(newSize);
+            if (pktout)
+            {
+                pktout->i_buffer = newSize;
+                memcpy(pktout->p_buffer, pkt->buffer->p_buffer + RTP_HEADER_SIZE, newSize);
+                /* free the buffer and increase the read index */
+                flow->ri = idx;
+                /* TODO: calculate average duration using buffer average (bring from sender) */
+                found_data = true;
+            }
+            block_Release(pkt->buffer);
+            pkt->buffer = NULL;
+            break;
+        }
+
+    }
+
+    if (loss_amount > 0 && found_data == true)
+    {
+        /* Packet loss confirmed, we found valid data after the holes */
+        msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount, 
+            flow->ri, flow->wi);
+        p_sys->i_lost_packets += loss_amount;
+    }
+
+    return pktout;
+}
+
+static void *rist_thread(void *data)
+{
+    stream_t *p_access = data;
+    stream_sys_t *p_sys = p_access->p_sys;
+
+    /* Process nacks every 5ms */
+    /* We only ask for the relevant ones */
+    for (;;) {
+        block_t *pkt_nacks = block_FifoGet(p_sys->p_fifo);
+
+        int canc = vlc_savecancel();
+
+        /* there are two bytes per nack */
+        uint16_t nack_count = (uint16_t)pkt_nacks->i_buffer/2;
+        switch(p_sys->nack_type) {
+            case NACK_FMT_BITMASK:
+                send_bbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
+                break;
+
+            default:
+                send_rbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
+        }
+
+        if (nack_count > 1)
+            msg_Dbg(p_access, "Sent %u NACKs !!!", nack_count);
+        block_Release(pkt_nacks);
+
+        vlc_restorecancel (canc);
+    }
+
+    return NULL;
+}
+
+static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+    uint64_t now;
+    *eof = false;
+    block_t *pktout = NULL;
+    struct pollfd pfd[2];
+    int ret;
+    ssize_t r;
+    struct sockaddr_storage peer;
+    socklen_t slen = sizeof(struct sockaddr_storage);
+    struct rist_flow *flow = p_sys->flow;
+
+    if (vlc_killed() || (flow->reset == 1 && p_sys->eof_on_reset))
+    {
+        *eof = true;
+        return NULL;
+    }
+
+    pfd[0].fd = flow->fd_in;
+    pfd[0].events = POLLIN;
+    pfd[1].fd = flow->fd_nack;
+    pfd[1].events = POLLIN;
+
+    /* The protocol uses a fifo buffer with a fixed time delay.
+     * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the 
+     * packets. If I waited indefinitely for data coming in, the rate and delay of output packets 
+     * would be wrong. I am calling the rist_dequeue function every time a data packet comes in 
+     * and also every time we get a poll timeout. The configurable poll timeout is for controling 
+     * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers 
+     * most cases. */
+
+    ret = vlc_poll_i11e(pfd, 2, p_sys->i_poll_timeout_current);
+    if (unlikely(ret < 0))
+        return NULL;
+    else if (ret == 0)
+    {
+        /* Poll timeout, check the queue for the next packet that needs to be delivered */
+        pktout = rist_dequeue(p_access, flow);
+        /* if there is data, we need to come back faster to finish emptying it */
+        if (pktout) {
+            p_sys->i_poll_timeout_current = 0;
+            p_sys->i_poll_timeout_zero_count++;
+        } else {
+            p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
+            p_sys->i_poll_timeout_nonzero_count++;
+        }
+    }
+    else
+    {
+
+        uint8_t *buf = malloc(p_sys->i_max_packet_size);
+        if ( unlikely( buf == NULL ) )
+            return NULL;
+
+        /* Process rctp incoming data */
+        if (pfd[1].revents & POLLIN)
+        {
+            r = rist_ReadFrom_i11e(flow->fd_nack, buf, p_sys->i_max_packet_size, 
+                (struct sockaddr *)&peer, &slen);
+            if (unlikely(r == -1)) {
+                msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
+            }
+            else {
+                rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
+            }
+        }
+
+        /* Process regular incoming data */
+        if (pfd[0].revents & POLLIN)
+        {
+            r = rist_Read_i11e(flow->fd_in, buf, p_sys->i_max_packet_size);
+            if (unlikely(r == -1)) {
+                msg_Err(p_access, "socket %d error: %s\n", flow->fd_in, gai_strerror(errno));
+            }
+            else
+            {
+                /* rist_input will process and queue the pkt */
+                if (rist_input(p_access, flow, buf, r))
+                {
+                    /* Check the queue for the next packet that needs to be delivered */
+                    pktout = rist_dequeue(p_access, flow);
+                    if (pktout) {
+                        p_sys->i_poll_timeout_current = 0;
+                        p_sys->i_poll_timeout_zero_count++;
+                    } else {
+                        p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
+                        p_sys->i_poll_timeout_nonzero_count++;
+                    }
+                }
+                else
+                {
+                    if (p_sys->eof_on_reset)
+                        *eof = true;
+                }
+            }
+        }
+
+        free(buf);
+        buf = NULL;
+    }
+
+    now = vlc_tick_now();
+
+    /* Process stats and print them out */
+    /* We need to measure some items every 70ms */
+    uint64_t interval = (now - flow->feedback_time);
+    if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
+    {
+        if (p_sys->i_poll_timeout_nonzero_count > 0)
+        {
+            float ratio = (float)p_sys->i_poll_timeout_zero_count 
+                / (float)p_sys->i_poll_timeout_nonzero_count;
+            if (ratio <= 1)
+                p_sys->vbr_ratio += 1 - ratio;
+            else
+                p_sys->vbr_ratio += ratio - 1;
+            p_sys->vbr_ratio_count++;
+            /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f", 
+                p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
+            p_sys->i_poll_timeout_zero_count = 0;
+            p_sys->i_poll_timeout_nonzero_count =  0;
+        }
+    }
+    /* We print out the stats once per second */
+    interval = (now - p_sys->i_last_stat);
+    if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
+    {
+        if ( p_sys->i_lost_packets > 0)
+            msg_Err(p_access, "We have %d lost packets", p_sys->i_lost_packets);
+        float ratio = 1;
+        if (p_sys->vbr_ratio_count > 0)
+            ratio = p_sys->vbr_ratio / (float)p_sys->vbr_ratio_count;
+        float quality = 100;
+        if (p_sys->i_total_packets > 0)
+            quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets + 
+                p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
+        if (quality != 100)
+            msg_Info(p_access, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \
+                "%.2f, Link Quality %.2f%%", p_sys->i_total_packets, p_sys->i_recovered_packets, 
+                p_sys->i_reordered_packets, p_sys->i_lost_packets, ratio, quality);
+        p_sys->i_last_stat = now;
+        p_sys->vbr_ratio = 0;
+        p_sys->vbr_ratio_count = 0;
+        p_sys->i_lost_packets = 0;
+        p_sys->i_recovered_packets = 0;
+        p_sys->i_reordered_packets = 0;
+        p_sys->i_total_packets = 0;
+    }
+
+    /* Send rtcp feedback every RTCP_INTERVAL */
+    interval = (now - flow->feedback_time);
+    if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
+    {
+        /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval, 
+        VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
+        send_rtcp_feedback(p_access, flow);
+        flow->feedback_time = now;
+    }
+
+    /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
+    interval = (now - p_sys->last_nack_tx);
+    if ( interval > VLC_TICK_FROM_MS(NACK_INTERVAL) )
+    {
+        send_nacks(p_access, p_sys->flow);
+        p_sys->last_nack_tx = now;
+    }
+
+    /* Safety check for when the input stream stalls */
+    if ( p_sys->last_data_rx > 0 && now > p_sys->last_data_rx &&
+        (uint64_t)(now - p_sys->last_data_rx) >  (uint64_t)VLC_TICK_FROM_MS(flow->latency) &&
+        (uint64_t)(now - p_sys->last_reset) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) )
+    {
+        msg_Err(p_access, "No data received for %"PRId64" ms, resetting buffers", 
+            (int64_t)(now - p_sys->last_data_rx)/1000);
+        p_sys->last_reset = now;
+        flow->reset = 1;
+    }
+
+    if (pktout)
+        return pktout;
+    else
+        return NULL;
+}
+
+static void Clean( stream_t *p_access )
+{
+    stream_sys_t *p_sys = p_access->p_sys;
+
+    if( likely(p_sys->p_fifo != NULL) )
+        block_FifoRelease( p_sys->p_fifo );
+
+    if (p_sys->flow)
+    {
+        if (p_sys->flow->fd_in >= 0)
+            net_Close (p_sys->flow->fd_in);
+        if (p_sys->flow->fd_nack >= 0)
+            net_Close (p_sys->flow->fd_nack);
+        for (int i=0; i<RIST_QUEUE_SIZE; i++) {
+            struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
+            if (pkt->buffer && pkt->buffer->i_buffer > 0) {
+                block_Release(pkt->buffer);
+                pkt->buffer = NULL;
+            }
+        }
+        free(p_sys->flow->buffer);
+        free(p_sys->flow);
+    }
+
+    vlc_mutex_destroy( &p_sys->lock );
+}
+
+static void Close(vlc_object_t *p_this)
+{
+    stream_t     *p_access = (stream_t*)p_this;
+    stream_sys_t *p_sys = p_access->p_sys;
+
+    vlc_cancel(p_sys->thread);
+    vlc_join(p_sys->thread, NULL);
+
+    Clean( p_access );
+}
+
+static int Open(vlc_object_t *p_this)
+{
+    stream_t     *p_access = (stream_t*)p_this;
+    stream_sys_t *p_sys = NULL;
+    vlc_url_t     parsed_url = { 0 };
+
+    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+    if( unlikely( p_sys == NULL ) )
+        return VLC_ENOMEM;
+
+    p_access->p_sys = p_sys;
+
+    memset(p_sys->sender_name, 0, MAX_CNAME);
+    vlc_mutex_init( &p_sys->lock );
+
+    if ( vlc_UrlParse( &parsed_url, p_access->psz_url ) == -1 )
+    {
+        msg_Err( p_access, "Failed to parse input URL (%s)",
+            p_access->psz_url );
+        goto failed;
+    }
+
+    /* Initialize rist flow */
+    p_sys->flow = rist_udp_receiver(p_access, &parsed_url);
+    vlc_UrlClean( &parsed_url );
+    if (!p_sys->flow)
+    {
+        msg_Err( p_access, "Failed to open rist flow (%s)",
+            p_access->psz_url );
+        goto failed;
+    }
+
+    p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
+    p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
+    p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
+    p_sys->eof_on_reset = var_InheritBool( p_access, "eof-on-reset" );
+    p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
+    p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
+    p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
+    p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
+    msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
+
+    /* Convert to rtp times */
+    p_sys->flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->latency));
+    p_sys->flow->retry_interval = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->retry_interval));
+    p_sys->flow->reorder_buffer = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->reorder_buffer));
+
+    p_sys->p_fifo = block_FifoNew();
+    if( unlikely(p_sys->p_fifo == NULL) )
+        goto failed;
+
+    /* This extra thread is for sending feedback/nack packets even when no data comes in */
+    if (vlc_clone(&p_sys->thread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
+    {
+        msg_Err(p_access, "Failed to create worker thread.");
+        goto failed;
+    }
+
+    p_access->pf_block = BlockRIST;
+    p_access->pf_control = Control;
+
+    return VLC_SUCCESS;
+
+failed:
+    Clean( p_access );
+    return VLC_EGENERIC;
+}
+
+/* Module descriptor */
+vlc_module_begin ()
+
+    set_shortname( N_("RIST") )
+    set_description( N_("RIST input") )
+    set_category( CAT_INPUT )
+    set_subcategory( SUBCAT_INPUT_ACCESS )
+
+    add_integer( "packet-size", RIST_MAX_PACKET_SIZE,
+        N_("RIST maximum packet size (bytes)"), NULL, true )
+    add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT,
+        N_("This controls the maximum jitter that will be passed to the demux/decode chain " \
+            "(defaut is 5ms)"),
+        N_("The lower the value, the more CPU cycles the algortyhm will consume"), true )
+    add_integer( "latency", RIST_DEFAULT_LATENCY, N_("RIST latency (ms)"), NULL, true )
+    add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL, N_("RIST nack retry interval (ms)"),
+        NULL, true )
+    add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
+        NULL, true )
+    add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
+    add_bool( "eof-on-reset", false, "Trigger and EOF event when the buffer reset is triggered",
+        "This is probably useful when you are decoding but not so much if you are streaming", true )
+    add_integer( "nack-type", NACK_FMT_RANGE,
+            N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
+        change_integer_list( nack_type, nack_type_names )
+
+    set_capability( "access", 0 )
+    add_shortcut( "rist", "tr06" )
+
+    set_callbacks( Open, Close )
+
+vlc_module_end ()
diff --git a/modules/access/rist.h b/modules/access/rist.h
new file mode 100644
index 0000000000..256c1fcf63
--- /dev/null
+++ b/modules/access/rist.h
@@ -0,0 +1,335 @@
+/*****************************************************************************
+ * rist.h: RIST (Reliable Internet Stream Transport) helper
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ *          Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#include <stdint.h>
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+#include <errno.h>
+#include <assert.h>
+
+/*****************************************************************************
+ * Public API
+ *****************************************************************************/
+
+/* RIST */
+
+/* RTP header format (RFC 3550) */
+/*
+   0                   1                   2                   3
+   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |V=2|P|X|  CC   |M|     PT      |       sequence number         |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                           timestamp                           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |           synchronization source (SSRC) identifier            |
+   +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
+   |            contributing source (CSRC) identifiers             |
+   |                             ....                              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+
+#define RIST_QUEUE_SIZE 65536
+#define RTP_PKT_SIZE (1472)
+
+#define RTCP_INTERVAL 75 /*ms*/
+
+#define SEVENTY_YEARS_OFFSET (2208988800ULL)
+#define MAX_NACKS 32
+#define MAX_CNAME 128
+#define RTCP_EMPTY_RR_SIZE 8
+
+#define RTCP_PT_RTPFR             204
+
+struct rtp_pkt {
+    uint32_t rtp_ts;
+    struct block_t *buffer;
+};
+
+/* RIST NACK header format  */
+/*
+   0                   1                   2                   3
+   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |       SNBase low bits         |        Length recovery        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |E| PT recovery |                    Mask                       |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                          TS recovery                          |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |N|D|type |index|    Offset     |      NA       |SNBase ext bits|
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+*/
+
+struct rist_flow {
+    uint8_t reset;
+    struct rtp_pkt *buffer;
+    uint32_t qsize;
+    uint32_t last_out;
+    uint32_t ssrc;
+    char cname[MAX_CNAME];
+    struct sockaddr_storage peer_sockaddr;
+    socklen_t peer_socklen;
+    uint16_t ri,
+        wi;
+    int fd_in;
+    int fd_out;
+    int fd_rtcp;
+    int fd_nack;
+    uint8_t nacks_retries[RIST_QUEUE_SIZE];
+    uint32_t hi_timestamp;
+    uint64_t feedback_time;
+    uint32_t latency;
+    uint32_t rtp_latency;
+    uint32_t retry_interval;
+    uint32_t reorder_buffer;
+    uint8_t max_retries;
+    uint32_t packets_count;
+    uint32_t bytes_count;
+};
+
+static inline uint16_t rtcp_fb_nack_get_range_start(const uint8_t *p_rtcp_fb_nack)
+{
+    return (p_rtcp_fb_nack[0] << 8) | p_rtcp_fb_nack[1];
+}
+
+static inline uint16_t rtcp_fb_nack_get_range_extra(const uint8_t *p_rtcp_fb_nack)
+{
+    return (p_rtcp_fb_nack[2] << 8) | p_rtcp_fb_nack[3];
+}
+
+static inline void rtcp_fb_nack_set_range_start(uint8_t *p_rtcp_fb_nack,
+                                              uint16_t start)
+{
+    p_rtcp_fb_nack[0] = (start >> 8) & 0xff;
+    p_rtcp_fb_nack[1] = start & 0xff;
+}
+
+static inline void rtcp_fb_nack_set_range_extra(uint8_t *p_rtcp_fb_nack,
+                                                 uint16_t extra)
+{
+    p_rtcp_fb_nack[2] = (extra >> 8) & 0xff;
+    p_rtcp_fb_nack[3] = extra & 0xff;
+}
+
+static inline void populate_cname(int fd, char *identifier)
+{
+    /* Set the CNAME Identifier as host at ip:port and fallback to hostname if needed */
+    char hostname[MAX_CNAME];
+    struct sockaddr_storage peer_sockaddr;
+    int name_length = 0;
+    socklen_t peer_socklen;
+    int ret_hostname = gethostname(hostname, MAX_CNAME);
+    if (ret_hostname == -1)
+        snprintf(hostname, MAX_CNAME, "UnknownHost");
+    int ret_sockname = getsockname(fd, (struct sockaddr *)&peer_sockaddr, &peer_socklen);
+    if (ret_sockname == 0)
+    {
+        struct sockaddr *peer = (struct sockaddr *)&peer_sockaddr;
+        if (peer->sa_family == AF_INET) {
+            struct sockaddr_in *xin = (void*)peer;
+            name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
+                            inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
+            if (name_length >= MAX_CNAME)
+                identifier[MAX_CNAME-1] = 0;
+        } else if (peer->sa_family == AF_INET6) {
+            struct sockaddr_in6 *xin6 = (void*)peer;
+            char str[INET6_ADDRSTRLEN];
+            inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
+            name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
+                            str, ntohs(xin6->sin6_port));
+            if (name_length >= MAX_CNAME)
+                identifier[MAX_CNAME-1] = 0;
+        }
+    }
+    if (name_length == 0)
+    {
+        name_length = snprintf(identifier, MAX_CNAME, "%s", hostname);
+        if (name_length >= MAX_CNAME)
+            identifier[MAX_CNAME-1] = 0;
+    }
+}
+
+static inline uint32_t rtp_get_ts( vlc_tick_t i_pts )
+{
+    unsigned i_clock_rate = 90000;
+    /* This is an overflow-proof way of doing:
+     * return i_pts * (int64_t)i_clock_rate / CLOCK_FREQ;
+     *
+     * NOTE: this plays nice with offsets because the (equivalent)
+     * calculations are linear. */
+    lldiv_t q = lldiv(i_pts, CLOCK_FREQ);
+    return q.quot * (int64_t)i_clock_rate
+          + q.rem * (int64_t)i_clock_rate / CLOCK_FREQ;
+}
+
+static inline vlc_tick_t ts_get_from_rtp( uint32_t i_rtp_ts )
+{
+    unsigned i_clock_rate = 90000;
+    return (vlc_tick_t)i_rtp_ts * (vlc_tick_t)(CLOCK_FREQ/i_clock_rate);
+}
+
+static inline ssize_t rist_ReadFrom_i11e(int fd, void *buf, size_t len, struct sockaddr *peer, 
+    socklen_t *slen)
+{
+    ssize_t ret = -1;
+
+    if (peer == NULL)
+        ret = vlc_recv_i11e(fd, buf, len, 0);
+    else
+        ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
+
+    if (ret == -1)
+    {
+        switch (errno)
+        {
+        case EAGAIN:
+        case EINTR:
+            if (vlc_killed())
+                return ret;
+
+            /* retry one time */
+            if (peer == NULL)
+                ret = vlc_recv_i11e(fd, buf, len, 0);
+            else
+                ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
+        default:
+            break;
+        }
+    }
+    return ret;
+}
+
+static inline ssize_t rist_Read_i11e(int fd, void *buf, size_t len)
+{
+    return rist_ReadFrom_i11e(fd, buf, len, NULL, NULL);
+}
+
+static inline ssize_t rist_ReadFrom(int fd, void *buf, size_t len, struct sockaddr *peer, 
+    socklen_t *slen)
+{
+    ssize_t ret = -1;
+
+    if (peer == NULL)
+        ret = recv(fd, buf, len, 0);
+    else
+        ret = recvfrom(fd, buf, len, 0, peer, slen);
+
+    if (ret == -1)
+    {
+        switch (errno)
+        {
+        case EAGAIN:
+        case EINTR:
+            /* retry one time */
+            if (peer == NULL)
+                ret = recv(fd, buf, len, 0);
+            else
+                ret = recvfrom(fd, buf, len, 0, peer, slen);
+        default:
+            break;
+        }
+    }
+    return ret;
+}
+
+static inline ssize_t rist_Read(int fd, void *buf, size_t len)
+{
+    return rist_ReadFrom(fd, buf, len, NULL, NULL);
+}
+
+static inline ssize_t rist_WriteTo_i11e(int fd, const void *buf, size_t len, 
+    const struct sockaddr *peer, socklen_t slen)
+{
+#ifdef _WIN32
+    # define ENOBUFS      WSAENOBUFS
+    # define EAGAIN       WSAEWOULDBLOCK
+    # define EWOULDBLOCK  WSAEWOULDBLOCK
+#endif
+    ssize_t r = -1;
+    if (slen == 0)
+        r = vlc_send_i11e( fd, buf, len, 0 );
+    else
+        r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
+    if( r == -1
+        && net_errno != EAGAIN && net_errno != EWOULDBLOCK
+        && net_errno != ENOBUFS && net_errno != ENOMEM )
+    {
+        int type;
+        getsockopt( fd, SOL_SOCKET, SO_TYPE,
+                    &type, &(socklen_t){ sizeof(type) });
+        if( type == SOCK_DGRAM )
+        {
+            /* ICMP soft error: ignore and retry */
+            if (slen == 0)
+                r = vlc_send_i11e( fd, buf, len, 0 );
+            else
+                r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
+        }
+    }
+    return r;
+}
+
+static inline ssize_t rist_Write_i11e(int fd, const void *buf, size_t len)
+{
+    return rist_WriteTo_i11e(fd, buf, len, NULL, 0);
+}
+
+static inline ssize_t rist_WriteTo(int fd, const void *buf, size_t len, const struct sockaddr *peer, 
+    socklen_t slen)
+{
+#ifdef _WIN32
+    # define ENOBUFS      WSAENOBUFS
+    # define EAGAIN       WSAEWOULDBLOCK
+    # define EWOULDBLOCK  WSAEWOULDBLOCK
+#endif
+    ssize_t r = -1;
+    if (slen == 0)
+        r = send( fd, buf, len, 0 );
+    else
+        r = sendto( fd, buf, len, 0, peer, slen );
+    if( r == -1
+        && net_errno != EAGAIN && net_errno != EWOULDBLOCK
+        && net_errno != ENOBUFS && net_errno != ENOMEM )
+    {
+        int type;
+        getsockopt( fd, SOL_SOCKET, SO_TYPE,
+                    &type, &(socklen_t){ sizeof(type) });
+        if( type == SOCK_DGRAM )
+        {
+            /* ICMP soft error: ignore and retry */
+            if (slen == 0)
+                r = send( fd, buf, len, 0 );
+            else
+                r = sendto( fd, buf, len, 0, peer, slen );
+        }
+    }
+    return r;
+}
+
+static inline ssize_t rist_Write(int fd, const void *buf, size_t len)
+{
+    return rist_WriteTo(fd, buf, len, NULL, 0);
+}
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 504e3ffd4b..d158aafb92 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -198,6 +198,8 @@ modules/access/oss.c
 modules/access/pulse.c
 modules/access/qtsound.m
 modules/access/rdp.c
+modules/access/rist.h
+modules/access/rist.c
 modules/access/rtp/rtp.c
 modules/access/samba.c
 modules/access/satip.c



More information about the vlc-commits mailing list