[vlc-commits] access_out: add RIST module

Sergio Ammirata git at videolan.org
Mon Nov 5 18:30:05 CET 2018


vlc | branch: master | Sergio Ammirata <sergio at ammirata.net> | Mon Nov  5 09:44:59 2018 -0500| [b504fabcedafc025a3e664a215146a34f746f2e6] | committer: Thomas Guillem

access_out: add RIST module

RIST for Reliable Internet Stream Transport (RIST) Protocol.

The implementation follows the Video Services Forum (VSF) Technical
Recommendation TR-06 which defines an ARQ based UDP transmission protocol
for real-time streaming over lossy networks (internet, wifi, etc).

Signed-off-by: Thomas Guillem <thomas at gllm.fr>

> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=b504fabcedafc025a3e664a215146a34f746f2e6
---

 NEWS                              |   3 +
 modules/MODULES_LIST              |   1 +
 modules/access_output/Makefile.am |   7 +
 modules/access_output/rist.c      | 799 ++++++++++++++++++++++++++++++++++++++
 po/POTFILES.in                    |   1 +
 5 files changed, 811 insertions(+)

diff --git a/NEWS b/NEWS
index 6115713dcd..1f2223294b 100644
--- a/NEWS
+++ b/NEWS
@@ -31,6 +31,9 @@ Access:
  * Enable SMB2 / SMB3 support on mobile ports with libsmb2
  * Added support for the RIST (Reliable Internet Stream Transport) Protocol
 
+Access output:
+ * Added support for the RIST (Reliable Internet Stream Transport) Protocol
+
 Video output:
  * Remove aa plugin
  * Remove evas plugin
diff --git a/modules/MODULES_LIST b/modules/MODULES_LIST
index a0f4d54f64..2419adf9dc 100644
--- a/modules/MODULES_LIST
+++ b/modules/MODULES_LIST
@@ -13,6 +13,7 @@ $Id$
  * access_output_file: File access_output module
  * access_output_http: HTTP Network access module
  * access_output_livehttp: Live HTTP stream output
+ * access_output_rist: RIST (Reliable Internet Stream Transport) access_output module
  * access_output_shout: Shoutcast access output
  * access_output_srt: SRT (Secure Reliable Transport) access_output module
  * access_output_udp: UDP Network access_output module
diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
index 583e4abcd0..184f3e4c81 100644
--- a/modules/access_output/Makefile.am
+++ b/modules/access_output/Makefile.am
@@ -34,3 +34,10 @@ libaccess_output_srt_plugin_la_LIBADD = $(SRT_LIBS)
 libaccess_output_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
 access_out_LTLIBRARIES += $(LTLIBaccess_output_srt)
 EXTRA_LTLIBRARIES += libaccess_output_srt_plugin.la
+
+### RIST ###
+libaccess_output_rist_plugin_la_SOURCES = access_output/rist.c access/rist.h
+libaccess_output_rist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
+if HAVE_BITSTREAM
+access_out_LTLIBRARIES += libaccess_output_rist_plugin.la
+endif
diff --git a/modules/access_output/rist.c b/modules/access_output/rist.c
new file mode 100644
index 0000000000..feff01f3c9
--- /dev/null
+++ b/modules/access_output/rist.c
@@ -0,0 +1,799 @@
+/*****************************************************************************
+ *  * rist.c: RIST (Reliable Internet Stream Transport) output module
+ *****************************************************************************
+ * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
+ * Copyright (C) 2018, SipRadius LLC
+ *
+ * Authors: Sergio Ammirata <sergio at ammirata.net>
+ *          Daniele Lacamera <root at danielinux.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_interrupt.h>
+#include <vlc_fs.h>
+#include <vlc_plugin.h>
+#include <vlc_sout.h>
+#include <vlc_block.h>
+#include <vlc_network.h>
+#include <vlc_threads.h>
+#include <vlc_rand.h>
+#ifdef HAVE_POLL
+#include <poll.h>
+#endif
+#include <sys/time.h>
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+#include <bitstream/ietf/rtcp_rr.h>
+#include <bitstream/ietf/rtcp_sr.h>
+#include <bitstream/ietf/rtcp_fb.h>
+#include <bitstream/ietf/rtcp_sdes.h>
+#include <bitstream/ietf/rtp.h>
+
+#include "../access/rist.h"
+
+/* Uncomment the following to introduce induced packet loss for TESTING purposes only */
+/*#define TEST_PACKET_LOSS*/
+
+/* The default target packet size */
+#define RIST_TARGET_PACKET_SIZE 1328
+/* The default caching delay for output data */
+#define DEFAULT_CACHING_DELAY 50
+/* The default buffer size in ms */
+#define DEFAULT_BUFFER_SIZE 0
+/* Calculate and print stats once per second */
+#define STATS_INTERVAL 1000 /*ms*/
+
+#define MPEG_II_TRANSPORT_STREAM (0x21)
+#define RIST_DEFAULT_PORT 1968
+
+#define SOUT_CFG_PREFIX "sout-rist-"
+
+static const char *const ppsz_sout_options[] = {
+    "packet-size",
+    "caching",
+    "buffer-size",
+    "ssrc",
+    NULL
+};
+
+typedef struct
+{
+    struct       rist_flow *flow;
+    uint16_t     rtp_counter;
+    char         receiver_name[MAX_CNAME];
+    uint64_t     last_rtcp_tx;
+    vlc_thread_t ristthread;
+    vlc_thread_t senderthread;
+    size_t       i_packet_size;
+    bool         b_mtu_warning;
+    vlc_mutex_t  lock;
+    vlc_mutex_t  fd_lock;
+    block_t      *p_pktbuffer;
+    uint64_t     i_ticks_caching;
+    uint32_t     ssrc;
+    block_fifo_t *p_fifo;
+    /* stats variables */
+    uint64_t     i_last_stat;
+    uint32_t     i_retransmit_packets;
+    uint32_t     i_total_packets;
+} sout_access_out_sys_t;
+
+static struct rist_flow *rist_init_tx()
+{
+    struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
+    if (!flow)
+        return NULL;
+
+    flow->reset = 1;
+    flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
+    if ( unlikely( flow->buffer == NULL ) )
+    {
+        free(flow);
+        return NULL;
+    }
+
+    return flow;
+}
+
+static struct rist_flow *rist_udp_transmitter(sout_access_out_t *p_access, char *psz_dst_server, 
+    int i_dst_port)
+{
+    struct rist_flow *flow;
+    flow = rist_init_tx();
+    if (!flow)
+        return NULL;
+
+    flow->fd_out = net_ConnectDgram(p_access, psz_dst_server, i_dst_port, -1, IPPROTO_UDP );
+    if (flow->fd_out < 0)
+    {
+        msg_Err( p_access, "cannot open output socket" );
+        return NULL;
+    }
+
+    flow->fd_rtcp = net_ConnectDgram(p_access, psz_dst_server, i_dst_port + 1, -1, IPPROTO_UDP );
+    if (flow->fd_rtcp < 0)
+    {
+        msg_Err( p_access, "cannot open nack socket" );
+        return NULL;
+    }
+
+    populate_cname(flow->fd_rtcp, flow->cname);
+    msg_Info(p_access, "our cname is %s", flow->cname);
+
+    return flow;
+}
+
+static void rist_retransmit(sout_access_out_t *p_access, struct rist_flow *flow, uint16_t seq)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    struct rtp_pkt *pkt = &(flow->buffer[seq]);
+    if (pkt->buffer == NULL)
+    {
+        msg_Err(p_access, "RIST recovery: missing requested packet %d, buffer not yet full", seq);
+        return;
+    }
+
+    /* Mark SSID for retransmission (change the last bit of the ssrc to 1) */
+    pkt->buffer->p_buffer[11] |= (1 << 0);
+#ifdef TEST_PACKET_LOSS
+#   warning COMPILED WITH SELF INFLICTED PACKET LOSS
+        if ((flow->packets_count % 14) == 0) {
+            return;
+        }
+#endif
+    uint32_t rtp_age = flow->hi_timestamp - pkt->rtp_ts;
+    uint64_t age = ts_get_from_rtp(rtp_age)/1000;
+    if (flow->rtp_latency > 0 && rtp_age > flow->rtp_latency)
+    {
+        msg_Err(p_access, "   Not Sending Nack #%d, too old (age %"PRId64" ms), current seq is:" \
+            " [%d]. Perhaps you should increase the buffer-size ...", seq, age, flow->wi);
+    }
+    else
+    {
+        msg_Dbg(p_access, "   Sending Nack #%d (age %"PRId64" ms), current seq is: [%d]", 
+            seq, age, flow->wi);
+        p_sys->i_retransmit_packets++;
+        vlc_mutex_lock( &p_sys->fd_lock );
+        rist_Write(flow->fd_out, pkt->buffer->p_buffer, pkt->buffer->i_buffer);
+        vlc_mutex_unlock( &p_sys->fd_lock );
+    }
+}
+
+static void process_nack(sout_access_out_t *p_access, uint8_t  ptype, uint16_t nrecords, 
+    struct rist_flow *flow, uint8_t *pkt)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    int i,j;
+
+    /*msg_Info(p_access, "   Nack (BbRR), %d record(s), Window: [%d:%d-->%d]", nrecords, 
+        flow->ri, flow->wi, flow->wi-flow->ri);*/
+
+    if (ptype == RTCP_PT_RTPFR)
+    {
+        uint8_t pi_ssrc[4];
+        rtcp_fb_get_ssrc_media_src(pkt, pi_ssrc);
+        if (memcmp(pi_ssrc, "RIST", 4) != 0)
+        {
+            msg_Info(p_access, "   Ignoring Nack with name %s", pi_ssrc);
+            return; /* Ignore app-type not RIST */
+        }
+
+        for (i = 0; i < (nrecords-2); i++) {
+            uint16_t missing;
+            uint16_t additional;
+            uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
+            missing = rtcp_fb_nack_get_range_start(rtp_nack_record);
+            additional = rtcp_fb_nack_get_range_extra(rtp_nack_record);
+            /*msg_Info(p_access, "   Nack (Range), %d, current seq is: [%d]", missing, flow->wi);*/
+            vlc_mutex_lock( &p_sys->lock );
+            rist_retransmit(p_access, flow, missing);
+            for (j = 0; j < additional; j++) {
+                rist_retransmit(p_access, flow, missing + j + 1);
+            }
+            vlc_mutex_unlock( &p_sys->lock );
+        }
+    }
+    else if (ptype == RTCP_PT_RTPFB)
+    {
+        for (i = 0; i < (nrecords-2); i++) {
+            uint16_t missing;
+            uint16_t bitmask;
+            uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
+            missing = rtcp_fb_nack_get_packet_id(rtp_nack_record);
+            bitmask = rtcp_fb_nack_get_bitmask_lost(rtp_nack_record);
+            /*msg_Info(p_access, "  Nack (Bitmask), %d, current seq is: [%d]", missing, flow->wi);*/
+            vlc_mutex_lock( &p_sys->lock );
+            rist_retransmit(p_access, flow, missing);
+            for (j = 0; j < 16; j++) {
+                if ((bitmask & (1 << j)) == (1 << j)) {
+                    rist_retransmit(p_access, flow, missing + j + 1);
+                }
+            }
+            vlc_mutex_unlock( &p_sys->lock );
+        }
+    }
+    else
+    {
+        msg_Err(p_access, "   !!! Wrong feedback. Ptype is %02x!=%02x, FMT: %02x", ptype, 
+            RTCP_PT_RTPFR, rtcp_fb_get_fmt(pkt));
+    }
+}
+
+static void rist_rtcp_recv(sout_access_out_t *p_access, struct rist_flow *flow, uint8_t *pkt_raw, 
+    size_t len)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    uint8_t *pkt = pkt_raw;
+    uint8_t  ptype;
+    uint16_t processed_bytes = 0;
+    uint16_t records;
+
+    while (processed_bytes < len) {
+        pkt = pkt_raw + processed_bytes;
+        /* safety checks */
+        uint16_t bytes_left = len - processed_bytes + 1;
+        if ( bytes_left < 4 )
+        {
+            /* we must have at least 4 bytes */
+            msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d", 
+                bytes_left);
+            return; 
+        }
+        else if (!rtp_check_hdr(pkt))
+        {
+            /* check for a valid rtp header */
+            msg_Err(p_access, "Malformed feedback packet starting with %02x, ignoring.", pkt[0]);
+            return;
+        }
+
+        ptype =  rtcp_get_pt(pkt);
+        records = rtcp_get_length(pkt);
+        uint16_t bytes = (uint16_t)(4 * (1 + records));
+        if (bytes > bytes_left)
+        {
+            /* check for a sane number of bytes */
+            msg_Err(p_access, "Malformed feedback packet, wrong len %d, expecting %u bytes in the" \
+                " packet, got a buffer of %u bytes. ptype = %d", rtcp_get_length(pkt), bytes, 
+                bytes_left, ptype);
+            return;
+        }
+
+        switch(ptype) {
+            case RTCP_PT_RTPFR:
+            case RTCP_PT_RTPFB:
+                process_nack(p_access, ptype, records, flow, pkt);
+                break;
+
+            case RTCP_PT_RR:
+                /*process_rr(f, pkt, len);*/
+                break;
+
+            case RTCP_PT_SDES:
+                {
+                    int8_t name_length = rtcp_sdes_get_name_length(pkt);
+                    if (name_length > bytes_left)
+                    {
+                        /* check for a sane number of bytes */
+                        msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
+                            "buffer of %u bytes.", name_length, bytes_left);
+                        return;
+                    }
+                    if (memcmp(pkt + RTCP_SDES_SIZE, p_sys->receiver_name, name_length) != 0)
+                    {
+                        memcpy(p_sys->receiver_name, pkt + RTCP_SDES_SIZE, name_length);
+                        msg_Info(p_access, "Receiver name: %s", p_sys->receiver_name);
+                    }
+                }
+                break;
+
+            case RTCP_PT_SR:
+                break;
+
+            default:
+                msg_Err(p_access, "   Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
+        }
+        processed_bytes += bytes;
+    }
+}
+
+static void rist_rtcp_send(sout_access_out_t *p_access)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    struct rist_flow *flow = p_sys->flow;
+    uint8_t rtcp_buf[RTCP_SR_SIZE + RTCP_SDES_SIZE + MAX_CNAME] = { };
+    struct timeval tv;
+    int r;
+    uint64_t fractions;
+    uint16_t namelen = strlen(flow->cname) + 1;
+    gettimeofday(&tv, NULL);
+
+    /* Populate SR for sender report */
+    uint8_t *p_sr = rtcp_buf;
+    rtp_set_hdr(p_sr);
+    rtcp_sr_set_pt(p_sr);
+    rtcp_sr_set_length(p_sr, 6);
+    rtcp_fb_set_int_ssrc_pkt_sender(p_sr, p_sys->ssrc);
+    rtcp_sr_set_ntp_time_msw(p_sr, tv.tv_sec + SEVENTY_YEARS_OFFSET);
+    fractions = (tv.tv_usec << 32ULL) / 1000000ULL;
+    rtcp_sr_set_ntp_time_lsw(p_sr, (uint32_t)fractions);
+    rtcp_sr_set_rtp_time(p_sr, rtp_get_ts(vlc_tick_now()));
+    vlc_mutex_lock( &p_sys->lock );
+    rtcp_sr_set_packet_count(p_sr, flow->packets_count);
+    rtcp_sr_set_octet_count(p_sr, flow->bytes_count);
+    vlc_mutex_unlock( &p_sys->lock );
+
+    /* Populate SDES for sender description */
+    uint8_t *p_sdes = (rtcp_buf + RTCP_SR_SIZE);
+    /* we need to make sure it is a multiple of 4, pad if necessary */
+    if ((namelen - 2) & 0x3)
+        namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
+    rtp_set_hdr(p_sdes);
+    rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
+    rtcp_sdes_set_pt(p_sdes);
+    rtcp_set_length(p_sdes, (namelen >> 2) + 2);
+    rtcp_sdes_set_cname(p_sdes, 1);
+    rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
+    p_sdes += RTCP_SDES_SIZE;
+    strlcpy((char *)p_sdes, flow->cname, namelen);
+
+    /* Send the rtcp message */
+    r = send(flow->fd_rtcp, rtcp_buf, RTCP_SR_SIZE + RTCP_SDES_SIZE + namelen, 0);
+    (void)r;
+}
+
+static void *rist_thread(void *data)
+{
+    sout_access_out_t *p_access = data;
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    uint64_t now;
+    uint8_t pkt[RTP_PKT_SIZE];
+    struct pollfd pfd[1];
+    int ret;
+    int r;
+
+    pfd[0].fd = p_sys->flow->fd_rtcp;
+    pfd[0].events = POLLIN;
+
+    for (;;) {
+        ret = poll(pfd, 1, RTCP_INTERVAL >> 1);
+        int canc = vlc_savecancel();
+        if (ret > 0)
+        {
+            if (pfd[0].revents & POLLIN)
+            {
+                r = rist_Read(p_sys->flow->fd_rtcp, pkt, RTP_PKT_SIZE);
+                if (r == RTP_PKT_SIZE) {
+                    msg_Err(p_access, "Rist RTCP messsage is too big (%d bytes) and was probably " \
+                        "cut, please keep it under %d bytes", r, RTP_PKT_SIZE);
+                }
+                if (unlikely(r == -1)) {
+                    msg_Err(p_access, "socket %d error: %s\n", p_sys->flow->fd_rtcp, 
+                        gai_strerror(errno));
+                }
+                else {
+                    rist_rtcp_recv(p_access, p_sys->flow, pkt, r);
+                }
+            }
+        }
+
+        /* And, in any case: */
+        now = vlc_tick_now();
+        if ((now - p_sys->last_rtcp_tx) > VLC_TICK_FROM_MS(RTCP_INTERVAL))
+        {
+            rist_rtcp_send(p_access);
+            p_sys->last_rtcp_tx = now;
+        }
+        vlc_restorecancel (canc);
+    }
+
+    return NULL;
+}
+
+/****************************************************************************
+ * RTP send
+ ****************************************************************************/
+static void* ThreadSend( void *data )
+{
+    sout_access_out_t *p_access = data;
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    vlc_tick_t i_caching = p_sys->i_ticks_caching;
+    struct rist_flow *flow = p_sys->flow;
+
+    for (;;)
+    {
+        ssize_t len = 0;
+        uint16_t seq = 0;
+        uint32_t pkt_ts = 0;
+        block_t *out = block_FifoGet( p_sys->p_fifo );
+
+        block_cleanup_push( out );
+        vlc_tick_wait (out->i_dts + i_caching);
+        vlc_cleanup_pop();
+
+        len = out->i_buffer;
+        int canc = vlc_savecancel();
+
+        seq = rtp_get_seqnum(out->p_buffer);
+        pkt_ts = rtp_get_timestamp(out->p_buffer);
+
+        vlc_mutex_lock( &p_sys->fd_lock );
+#ifdef TEST_PACKET_LOSS
+#   warning COMPILED WITH SELF INFLICTED PACKET LOSS
+        if ((seq % 14) == 0) {
+            /*msg_Err(p_access, "Dropped packet with seq number %d ...", seq);*/
+        }
+        else if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
+            msg_Err(p_access, "Error sending data packet after 2 tries ...");
+        }
+#else
+        if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
+            msg_Err(p_access, "Error sending data packet after 2 tries ...");
+        }
+#endif
+        vlc_mutex_unlock( &p_sys->fd_lock );
+
+        /* Insert Into Queue */
+        vlc_mutex_lock( &p_sys->lock );
+        /* Always replace the existing one with the new one */
+        struct rtp_pkt *pkt;
+        pkt = &(flow->buffer[seq]);
+        if (pkt->buffer)
+        {
+            block_Release(pkt->buffer);
+            pkt->buffer = NULL;
+        }
+        pkt->rtp_ts = pkt_ts;
+        pkt->buffer = out;
+
+        if (flow->reset == 1)
+        {
+            msg_Info(p_access, "Traffic detected");
+            /* First packet in the queue */
+            flow->reset = 0;
+        }
+        flow->wi = seq;
+        flow->hi_timestamp = pkt_ts;
+        /* Stats for RTCP feedback packets */
+        flow->packets_count++;
+        flow->bytes_count += len;
+        flow->last_out = seq;
+        vlc_mutex_unlock( &p_sys->lock );
+
+        /* We print out the stats once per second */
+        uint64_t now = vlc_tick_now();
+        uint64_t interval = (now - p_sys->i_last_stat);
+        if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
+        {
+            if (p_sys->i_retransmit_packets > 0)
+            {
+                float quality = 100;
+                if (p_sys->i_total_packets > 0)
+                    quality = (float)100 - (float)100*(float)(p_sys->i_retransmit_packets)
+                        /(float)p_sys->i_total_packets;
+                msg_Info(p_access, "STATS: Total %u, Retransmitted %u, Link Quality %.2f%%", 
+                    p_sys->i_total_packets, p_sys->i_retransmit_packets, quality);
+            }
+            p_sys->i_last_stat = now;
+            p_sys->i_retransmit_packets = 0;
+            p_sys->i_total_packets = 0;
+        }
+        p_sys->i_total_packets++;
+
+        vlc_restorecancel (canc);
+    }
+    return NULL;
+}
+
+static void SendtoFIFO( sout_access_out_t *p_access, block_t *buffer )
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    uint16_t seq = p_sys->rtp_counter++;
+
+    /* Set fresh rtp header data */
+    uint8_t *bufhdr = buffer->p_buffer;
+    rtp_set_hdr(bufhdr);
+    rtp_set_type(bufhdr, MPEG_II_TRANSPORT_STREAM);
+    rtp_set_seqnum(bufhdr, seq);
+    rtp_set_int_ssrc(bufhdr, p_sys->ssrc);
+    uint32_t pkt_ts = rtp_get_ts(buffer->i_dts);
+    rtp_set_timestamp(bufhdr, pkt_ts);
+
+    block_t *pkt = block_Duplicate(buffer);
+    block_FifoPut( p_sys->p_fifo, pkt );
+}
+
+static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    int i_len = 0;
+
+    while( p_buffer )
+    {
+        block_t *p_next;
+        int i_block_split = 0;
+
+        if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_packet_size )
+        {
+            msg_Warn( p_access, "Buffer data size (%zu) > configured packet size (%zu), you " \
+                "should probably increase the configured packet size", p_buffer->i_buffer, 
+                p_sys->i_packet_size );
+            p_sys->b_mtu_warning = true;
+        }
+
+        /* Temp buffer is already too large, flush */
+        if( p_sys->p_pktbuffer->i_buffer + p_buffer->i_buffer > p_sys->i_packet_size )
+        {
+            SendtoFIFO(p_access, p_sys->p_pktbuffer);
+            p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+        }
+
+        i_len += p_buffer->i_buffer;
+
+        while( p_buffer->i_buffer )
+        {
+
+            size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_packet_size );
+
+            i_block_split++;
+
+            if( p_sys->p_pktbuffer->i_buffer == RTP_HEADER_SIZE )
+            {
+                p_sys->p_pktbuffer->i_dts = p_buffer->i_dts;
+            }
+
+            memcpy( p_sys->p_pktbuffer->p_buffer + p_sys->p_pktbuffer->i_buffer,
+                    p_buffer->p_buffer, i_write );
+
+            p_sys->p_pktbuffer->i_buffer += i_write;
+            p_buffer->p_buffer += i_write;
+            p_buffer->i_buffer -= i_write;
+
+            /*  Flush if we reached the target size for the case of block size < target packet size.
+             *  Also flush when we are in block_split > 1 for the case when the block_size is
+             *  larger than the packet-size because we need to continue the inner loop */
+            if( p_sys->p_pktbuffer->i_buffer == p_sys->i_packet_size || i_block_split > 1 )
+            {
+                SendtoFIFO(p_access, p_sys->p_pktbuffer);
+                p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+            }
+
+        }
+
+        p_next = p_buffer->p_next;
+        block_Release( p_buffer );
+        p_buffer = p_next;
+
+    }
+
+    if ( i_len <= 0 ) {
+        block_ChainRelease( p_buffer );
+    }
+    return i_len;
+}
+
+static int Control( sout_access_out_t *p_access, int i_query, va_list args )
+{
+    VLC_UNUSED( p_access );
+
+    int i_ret = VLC_SUCCESS;
+
+    switch( i_query )
+    {
+        case ACCESS_OUT_CONTROLS_PACE:
+            *va_arg( args, bool * ) = false;
+            break;
+
+        default:
+            i_ret = VLC_EGENERIC;
+            break;
+    }
+
+    return i_ret;
+}
+
+static void Clean( sout_access_out_t *p_access )
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+
+    if( likely(p_sys->p_fifo != NULL) )
+        block_FifoRelease( p_sys->p_fifo );
+
+    if ( p_sys->flow )
+    {
+        if (p_sys->flow->fd_out >= 0) {
+            net_Close (p_sys->flow->fd_out);
+        }
+        if (p_sys->flow->fd_nack >= 0) {
+            net_Close (p_sys->flow->fd_rtcp);
+        }
+        for (int i=0; i<RIST_QUEUE_SIZE; i++) {
+            struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
+            if (pkt->buffer)
+            {
+                block_Release(pkt->buffer);
+                pkt->buffer = NULL;
+            }
+        }
+        free(p_sys->flow->buffer);
+        free(p_sys->flow);
+    }
+
+    vlc_mutex_destroy( &p_sys->lock );
+    vlc_mutex_destroy( &p_sys->fd_lock );
+    if (p_sys->p_pktbuffer)
+        block_Release(p_sys->p_pktbuffer);
+}
+
+static void Close( vlc_object_t * p_this )
+{
+    sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+
+    vlc_cancel(p_sys->ristthread);
+    vlc_cancel(p_sys->senderthread);
+
+    vlc_join(p_sys->ristthread, NULL);
+    vlc_join(p_sys->senderthread, NULL);
+
+    Clean( p_access );
+}
+
+static int Open( vlc_object_t *p_this )
+{
+    sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
+    sout_access_out_sys_t   *p_sys = NULL;
+
+    if (var_Create ( p_access, "dst-port", VLC_VAR_INTEGER )
+     || var_Create ( p_access, "src-port", VLC_VAR_INTEGER )
+     || var_Create ( p_access, "dst-addr", VLC_VAR_STRING )
+     || var_Create ( p_access, "src-addr", VLC_VAR_STRING ) )
+    {
+        msg_Err( p_access, "Valid network information is required." );
+        return VLC_ENOMEM;
+    }
+
+    config_ChainParse( p_access, SOUT_CFG_PREFIX, ppsz_sout_options, p_access->p_cfg );
+
+    p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
+    if( unlikely( p_sys == NULL ) )
+        return VLC_ENOMEM;
+
+    p_sys->rtp_counter = 0;
+    p_sys->last_rtcp_tx = 0;
+    memset(p_sys->receiver_name, 0, MAX_CNAME);
+
+    int i_dst_port = RIST_DEFAULT_PORT;
+    char *psz_dst_addr;
+    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
+    if( !psz_dst_addr )
+        goto failed;
+
+    if ( psz_parser[0] == '[' )
+        psz_parser = strchr( psz_parser, ']' );
+
+    psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
+    if ( psz_parser != NULL )
+    {
+        *psz_parser++ = '\0';
+        i_dst_port = atoi( psz_parser );
+    }
+
+    vlc_mutex_init( &p_sys->lock );
+    vlc_mutex_init( &p_sys->fd_lock );
+
+    msg_Info(p_access, "Connecting RIST output to %s:%d and %s:%d", psz_dst_addr, i_dst_port, 
+        psz_dst_addr, i_dst_port+1);
+    struct rist_flow *flow = rist_udp_transmitter(p_access, psz_dst_addr, i_dst_port);
+    free (psz_dst_addr);
+    if (!flow)
+        goto failed;
+
+    p_sys->flow = flow;
+    flow->latency = var_InheritInteger(p_access, SOUT_CFG_PREFIX "buffer-size");
+    flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(flow->latency));
+    p_sys->ssrc = var_InheritInteger(p_access, SOUT_CFG_PREFIX "ssrc");
+    if (p_sys->ssrc == 0) {
+        vlc_rand_bytes(&p_sys->ssrc, 4);
+    }
+    /* Last bit of ssrc must be 0 for normal data and 1 for retries */
+    p_sys->ssrc &= ~(1 << 0);
+
+    msg_Info(p_access, "SSRC: 0x%08X", p_sys->ssrc);
+    p_sys->i_ticks_caching = VLC_TICK_FROM_MS(var_InheritInteger( p_access, 
+        SOUT_CFG_PREFIX "caching"));
+    p_sys->i_packet_size = var_InheritInteger(p_access, SOUT_CFG_PREFIX "packet-size" );
+    p_sys->p_fifo = block_FifoNew();
+    if( unlikely(p_sys->p_fifo == NULL) )
+        goto failed;
+    p_sys->p_pktbuffer = block_Alloc( p_sys->i_packet_size );
+    if( unlikely(p_sys->p_pktbuffer == NULL) )
+        goto failed;
+
+    p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+
+    p_access->p_sys = p_sys;
+
+    if( vlc_clone(&p_sys->senderthread, ThreadSend, p_access, VLC_THREAD_PRIORITY_HIGHEST ) )
+    {
+        msg_Err(p_access, "Failed to create sender thread.");
+        goto failed;
+    }
+
+    if (vlc_clone(&p_sys->ristthread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
+    {
+        msg_Err(p_access, "Failed to create worker thread.");
+        vlc_cancel(p_sys->senderthread);
+        vlc_join(p_sys->senderthread, NULL);
+        goto failed;
+    }
+
+    p_access->pf_write = Write;
+    p_access->pf_control = Control;
+
+    return VLC_SUCCESS;
+
+failed:
+    Clean( p_access );
+    return VLC_EGENERIC;
+}
+
+#define CACHING_TEXT N_("RIST data output caching size (ms)")
+#define CACHING_LONGTEXT N_( \
+    "Having this cache will guarantee that the packets going out are " \
+    "delivered at a spacing determined by the chain timestamps thus ensuring " \
+    "a near jitter free output. Be aware that this setting will also add to " \
+    "the overall latency of the stream." )
+
+#define BUFFER_TEXT N_("RIST client side buffer size (ms)")
+#define BUFFER_LONGTEXT N_( \
+    "This must match the buffer size (latency) configured on the server side. If you " \
+    "are not sure, leave the default of -1 which will set it the maximum " \
+    "value and will use about 100MB of RAM" )
+
+#define SSRC_TEXT N_("SSRC used in RTP output (default is random, i.e. 0)")
+#define SSRC_LONGTEXT N_( \
+    "Use this setting to specify a known SSRC for the RTP header. This is only useful " \
+    "if your receiver acts on it. When using VLC as receiver, it is not." )
+
+/* Module descriptor */
+vlc_module_begin()
+
+    set_shortname( N_("RIST") )
+    set_description( N_("RIST stream output") )
+    set_category( CAT_SOUT )
+    set_subcategory( SUBCAT_SOUT_ACO )
+
+    add_integer( SOUT_CFG_PREFIX "packet-size", RIST_TARGET_PACKET_SIZE,
+            N_("RIST target packet size (bytes)"), NULL, true )
+    add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_CACHING_DELAY,
+            CACHING_TEXT, CACHING_LONGTEXT, true )
+    add_integer( SOUT_CFG_PREFIX "buffer-size", DEFAULT_BUFFER_SIZE,
+            BUFFER_TEXT, BUFFER_LONGTEXT, true )
+    add_integer( SOUT_CFG_PREFIX "ssrc", 0,
+            SSRC_TEXT, SSRC_LONGTEXT, true )
+
+    set_capability( "sout access", 0 )
+    add_shortcut( "rist", "tr06" )
+
+    set_callbacks( Open, Close )
+
+vlc_module_end ()
diff --git a/po/POTFILES.in b/po/POTFILES.in
index d158aafb92..7bcd2ffd0e 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -226,6 +226,7 @@ modules/access_output/dummy.c
 modules/access_output/file.c
 modules/access_output/http.c
 modules/access_output/livehttp.c
+modules/access_output/rist.c
 modules/access_output/shout.c
 modules/access_output/srt.c
 modules/access_output/udp.c



More information about the vlc-commits mailing list