[vlc-commits] access: rist: add support for multicast

Sergio Ammirata git at videolan.org
Wed Mar 20 15:22:51 CET 2019


vlc | branch: master | Sergio Ammirata <sergio at ammirata.net> | Tue Mar 19 08:14:32 2019 -0400| [b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec] | committer: Thomas Guillem

access: rist: add support for multicast

Signed-off-by: Thomas Guillem <thomas at gllm.fr>

> http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec
---

 modules/access/rist.c | 150 +++++++++++++++++++++++++++++++++++++-------------
 modules/access/rist.h |  32 ++++++++++-
 2 files changed, 143 insertions(+), 39 deletions(-)

diff --git a/modules/access/rist.c b/modules/access/rist.c
index f6e572b863..e06828c88a 100644
--- a/modules/access/rist.c
+++ b/modules/access/rist.c
@@ -85,7 +85,11 @@ typedef struct
     int              i_max_packet_size;
     int              i_poll_timeout;
     int              i_poll_timeout_current;
-    bool             eof_on_reset;
+    bool             b_ismulticast;
+    bool             b_sendnacks;
+    bool             b_sendblindnacks;
+    bool             b_disablenacks;
+    bool             b_flag_discontinuity;
     block_fifo_t     *p_fifo;
     vlc_mutex_t      lock;
     uint64_t         last_message;
@@ -97,6 +101,7 @@ typedef struct
     float            vbr_ratio;
     uint16_t         vbr_ratio_count;
     uint32_t         i_lost_packets;
+    uint32_t         i_nack_packets;
     uint32_t         i_recovered_packets;
     uint32_t         i_reordered_packets;
     uint32_t         i_total_packets;
@@ -131,7 +136,7 @@ static struct rist_flow *rist_init_rx(void)
     if (!flow)
         return NULL;
 
-    flow->reset = 2;
+    flow->reset = 1;
     flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
 
     if ( unlikely( flow->buffer == NULL ) )
@@ -139,6 +144,10 @@ static struct rist_flow *rist_init_rx(void)
         free(flow);
         return NULL;
     }
+    flow->fd_in = -1;
+    flow->fd_nack = -1;
+    flow->fd_rtcp_m = -1;
+
     return flow;
 }
 
@@ -150,7 +159,7 @@ static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf,
     vlc_mutex_unlock( &lock );
 }
 
-static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url)
+static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast)
 {
     stream_sys_t *p_sys = p_access->p_sys;
     msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
@@ -166,21 +175,47 @@ static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed
     if (p_sys->flow->fd_in < 0)
     {
         msg_Err( p_access, "cannot open input socket" );
-        return NULL;
+        goto fail;
     }
 
-    p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
-        NULL, 0, IPPROTO_UDP);
+    if (b_ismulticast)
+    {
+        p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+            NULL, 0, IPPROTO_UDP);
+        if (p_sys->flow->fd_rtcp_m < 0)
+        {
+            msg_Err( p_access, "cannot open multicast nack socket" );
+            goto fail;
+        }
+        p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host,
+            parsed_url->i_port + 1, -1, IPPROTO_UDP );
+    }
+    else
+    {
+        p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
+            NULL, 0, IPPROTO_UDP);
+    }
     if (p_sys->flow->fd_nack < 0)
     {
         msg_Err( p_access, "cannot open nack socket" );
-        return NULL;
+        goto fail;
     }
 
     populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
     msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
 
     return  p_sys->flow;
+
+fail:
+    if (p_sys->flow->fd_in != -1)
+        vlc_close(p_sys->flow->fd_in);
+    if (p_sys->flow->fd_nack != -1)
+        vlc_close(p_sys->flow->fd_nack);
+    if (p_sys->flow->fd_rtcp_m != -1)
+        vlc_close(p_sys->flow->fd_rtcp_m);
+    free(p_sys->flow->buffer);
+    free(p_sys->flow);
+    return NULL;
 }
 
 static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
@@ -263,8 +298,9 @@ static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin
     len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
 
     /* Write to Socket */
-    rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
-        (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+    if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+        rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+            (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
     free(buf);
     buf = NULL;
 }
@@ -302,8 +338,9 @@ static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin
     len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
 
     /* Write to Socket */
-    rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
-        (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
+    if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
+        rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
+            (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
     free(buf);
     buf = NULL;
 }
@@ -361,6 +398,7 @@ static void send_nacks(stream_t *p_access, struct rist_flow *flow)
     }
     if (nacks_len > 0)
     {
+        p_sys->i_nack_packets += nacks_len;
         block_t *pkt_nacks = block_Alloc(nacks_len * 2);
         if (pkt_nacks)
         {
@@ -485,6 +523,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_
 
             case RTCP_PT_SDES:
                 {
+                    if (p_sys->b_sendnacks == false)
+                        p_sys->b_sendnacks = true;
+                    if (p_sys->b_ismulticast)
+                        return;
                     /* Check for changes in source IP address or port */
                     int8_t name_length = rtcp_sdes_get_name_length(buf);
                     if (name_length > bytes_left)
@@ -535,6 +577,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_
                 break;
 
             case RTCP_PT_SR:
+                if (p_sys->b_sendnacks == false)
+                    p_sys->b_sendnacks = true;
+                if (p_sys->b_ismulticast)
+                        return;
                 break;
 
             default:
@@ -566,17 +612,8 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf,
     uint32_t pkt_ts = rtp_get_timestamp(buf);
     bool retrasnmitted = false;
     bool success = true;
-    uint64_t now = vlc_tick_now();
 
-    if (flow->reset == 2)
-    {
-        if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) {
-            msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
-        }
-        p_sys->last_message = now;
-        return success;
-    }
-    else if (flow->reset == 1)
+    if (flow->reset == 1)
     {
         msg_Info(p_access, "Traffic detected after buffer reset");
         /* First packet in the queue */
@@ -585,6 +622,7 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf,
         flow->wi = idx;
         flow->ri = idx;
         flow->reset = 0;
+        p_sys->b_flag_discontinuity = true;
     }
 
     /* Check to see if this is a retransmission or a regular packet */
@@ -725,6 +763,7 @@ static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
         msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
             flow->ri, flow->wi);
         p_sys->i_lost_packets += loss_amount;
+        p_sys->b_flag_discontinuity = true;
     }
 
     return pktout;
@@ -769,23 +808,30 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
     uint64_t now;
     *eof = false;
     block_t *pktout = NULL;
-    struct pollfd pfd[2];
+    struct pollfd pfd[3];
     int ret;
     ssize_t r;
     struct sockaddr_storage peer;
     socklen_t slen = sizeof(struct sockaddr_storage);
     struct rist_flow *flow = p_sys->flow;
 
-    if (vlc_killed() || (flow->reset == 1 && p_sys->eof_on_reset))
+    if (vlc_killed())
     {
         *eof = true;
         return NULL;
     }
 
+    int poll_sockets = 2;
     pfd[0].fd = flow->fd_in;
     pfd[0].events = POLLIN;
     pfd[1].fd = flow->fd_nack;
     pfd[1].events = POLLIN;
+    if (p_sys->b_ismulticast)
+    {
+        pfd[2].fd = flow->fd_rtcp_m;
+        pfd[2].events = POLLIN;
+        poll_sockets++;
+    }
 
     /* The protocol uses a fifo buffer with a fixed time delay.
      * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
@@ -795,7 +841,7 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
      * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
      * most cases. */
 
-    ret = vlc_poll_i11e(pfd, 2, p_sys->i_poll_timeout_current);
+    ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current);
     if (unlikely(ret < 0))
         return NULL;
     else if (ret == 0)
@@ -827,6 +873,18 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
                 msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
             }
             else {
+                if (p_sys->b_ismulticast == false)
+                    rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
+            }
+        }
+        if (p_sys->b_ismulticast && pfd[2].revents & POLLIN)
+        {
+            r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size,
+                (struct sockaddr *)&peer, &slen);
+            if (unlikely(r == -1)) {
+                msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno));
+            }
+            else {
                 rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
             }
         }
@@ -853,11 +911,6 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
                         p_sys->i_poll_timeout_nonzero_count++;
                     }
                 }
-                else
-                {
-                    if (p_sys->eof_on_reset)
-                        *eof = true;
-                }
             }
         }
 
@@ -901,13 +954,15 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
             quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
                 p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
         if (quality != 100)
-            msg_Info(p_access, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \
-                "%.2f, Link Quality %.2f%%", p_sys->i_total_packets, p_sys->i_recovered_packets,
-                p_sys->i_reordered_packets, p_sys->i_lost_packets, ratio, quality);
+            msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
+                "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets,
+                p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets,
+                p_sys->i_lost_packets, ratio, quality);
         p_sys->i_last_stat = now;
         p_sys->vbr_ratio = 0;
         p_sys->vbr_ratio_count = 0;
         p_sys->i_lost_packets = 0;
+        p_sys->i_nack_packets = 0;
         p_sys->i_recovered_packets = 0;
         p_sys->i_reordered_packets = 0;
         p_sys->i_total_packets = 0;
@@ -943,7 +998,13 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
     }
 
     if (pktout)
+    {
+        if (p_sys->b_flag_discontinuity) {
+            pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY;
+            p_sys->b_flag_discontinuity = false;
+        }
         return pktout;
+    }
     else
         return NULL;
 }
@@ -961,6 +1022,8 @@ static void Clean( stream_t *p_access )
             net_Close (p_sys->flow->fd_in);
         if (p_sys->flow->fd_nack >= 0)
             net_Close (p_sys->flow->fd_nack);
+        if (p_sys->flow->fd_rtcp_m >= 0)
+            net_Close (p_sys->flow->fd_rtcp_m);
         for (int i=0; i<RIST_QUEUE_SIZE; i++) {
             struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
             if (pkt->buffer && pkt->buffer->i_buffer > 0) {
@@ -1008,7 +1071,8 @@ static int Open(vlc_object_t *p_this)
     }
 
     /* Initialize rist flow */
-    p_sys->flow = rist_udp_receiver(p_access, &parsed_url);
+    p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host);
+    p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast);
     vlc_UrlClean( &parsed_url );
     if (!p_sys->flow)
     {
@@ -1017,14 +1081,23 @@ static int Open(vlc_object_t *p_this)
         goto failed;
     }
 
+    p_sys->b_flag_discontinuity = false;
+    p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" );
+    p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" );
+    if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false)
+        p_sys->b_sendnacks = true;
+    else
+        p_sys->b_sendnacks = false;
     p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
     p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
     p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
-    p_sys->eof_on_reset = var_InheritBool( p_access, "eof-on-reset" );
     p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
-    p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
     p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
     p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
+    if (p_sys->b_disablenacks)
+        p_sys->flow->reorder_buffer = p_sys->flow->latency;
+    else
+        p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
     msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
 
     /* Convert to rtp times */
@@ -1073,11 +1146,14 @@ vlc_module_begin ()
     add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
         NULL, true )
     add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
-    add_bool( "eof-on-reset", false, "Trigger an EOF event when a buffer reset is triggered",
-        "This is probably useful when you are decoding but not so much if you are streaming", true )
     add_integer( "nack-type", NACK_FMT_RANGE,
             N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
         change_integer_list( nack_type, nack_type_names )
+    add_bool( "disable-nacks", false, "Disable NACK output packets",
+        "Use this to disable packet recovery", true )
+    add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder",
+        "Send nack messages even when we have not confirmed that the encoder is on our local " \
+        "network.", true )
 
     set_capability( "access", 0 )
     add_shortcut( "rist", "tr06" )
diff --git a/modules/access/rist.h b/modules/access/rist.h
index 75ade277d2..ec0f6ae0f9 100644
--- a/modules/access/rist.h
+++ b/modules/access/rist.h
@@ -92,11 +92,11 @@ struct rist_flow {
     char cname[MAX_CNAME];
     struct sockaddr_storage peer_sockaddr;
     socklen_t peer_socklen;
-    uint16_t ri,
-        wi;
+    uint16_t ri, wi;
     int fd_in;
     int fd_out;
     int fd_rtcp;
+    int fd_rtcp_m;
     int fd_nack;
     uint8_t nacks_retries[RIST_QUEUE_SIZE];
     uint32_t hi_timestamp;
@@ -337,3 +337,31 @@ static inline ssize_t rist_Write(int fd, const void *buf, size_t len)
 {
     return rist_WriteTo(fd, buf, len, NULL, 0);
 }
+
+static bool is_multicast_address(char *psz_dst_server)
+{
+    int ret;
+    int ismulticast = 0;
+
+    struct addrinfo hint = {
+        .ai_socktype = SOCK_DGRAM,
+        .ai_protocol = IPPROTO_UDP,
+        .ai_flags = AI_NUMERICSERV | AI_IDN | AI_PASSIVE,
+    }, *res;
+
+    ret = vlc_getaddrinfo(psz_dst_server, 0, &hint, &res);
+    if (ret) {
+        return 0;
+    } else if(res->ai_family == AF_INET) {
+        unsigned long addr = ntohl(inet_addr(psz_dst_server));
+        ismulticast =  IN_MULTICAST(addr);
+    } else if (res->ai_family == AF_INET6) {
+        if (strlen(psz_dst_server) >= 5 && (strncmp("[ff00", psz_dst_server, 5) == 0 ||
+                    strncmp("[FF00", psz_dst_server, 5) == 0))
+            ismulticast = 1;
+    }
+
+    freeaddrinfo(res);
+
+    return ismulticast;
+}
\ No newline at end of file



More information about the vlc-commits mailing list