[vlc-commits] [Git][videolan/vlc][master] 4 commits: Remove old RIST modules
Jean-Baptiste Kempf (@jbk)
gitlab at videolan.org
Sat Jul 10 11:55:04 UTC 2021
Jean-Baptiste Kempf pushed to branch master at VideoLAN / VLC
Commits:
65632fe6 by Sergio Ammirata at 2021-07-10T09:28:35+00:00
Remove old RIST modules
- - - - -
e6dd5f6a by Sergio Ammirata at 2021-07-10T09:28:35+00:00
Upgrade meson to 0.56.2 (needed to build librist)
- - - - -
ba63f304 by Sergio Ammirata at 2021-07-10T09:28:35+00:00
Add librist to contrib
Upgrade librist contrib to v0.2.4
- - - - -
1fa33c18 by Sergio Ammirata at 2021-07-10T09:28:35+00:00
Add libRIST based access and access_output modules.
These support the new RIST main profile as well as the RIST simple profile.
Co-authored-by: Gijs Peskens <gijs at peskens.net>
- - - - -
10 changed files:
- configure.ac
- + contrib/src/librist/SHA512SUMS
- + contrib/src/librist/rules.mak
- extras/tools/SHA512SUMS
- extras/tools/packages.mak
- modules/access/Makefile.am
- modules/access/rist.c
- modules/access/rist.h
- modules/access_output/Makefile.am
- modules/access_output/rist.c
Changes:
=====================================
configure.ac
=====================================
@@ -4273,6 +4273,11 @@ dnl SRT plugin
dnl
PKG_ENABLE_MODULES_VLC([SRT], [access_srt access_output_srt], [srt >= 1.3.0], [SRT input/output plugin], [auto], [], [], [-DENABLE_SRT])
+dnl
+dnl RIST plugin
+dnl
+PKG_ENABLE_MODULES_VLC([RIST], [rist access_output_rist], [librist], [RIST input/output plugin (default auto)])
+
EXTEND_HELP_STRING([Visualisations and Video filter plugins:])
dnl
dnl goom visualization plugin
=====================================
contrib/src/librist/SHA512SUMS
=====================================
@@ -0,0 +1 @@
+1c45f87bb0f0ea5a7acb529a796c546190cb5b461297930f2fad214fe0a84c76771158564d9e0ac92e8882e5e008b406655940b3babb5d198e68816cbba4bc68 librist-v0.2.4.tar.gz
=====================================
contrib/src/librist/rules.mak
=====================================
@@ -0,0 +1,29 @@
+# librist
+
+LIBRIST_VERSION := v0.2.4
+LIBRIST_URL := http://code.videolan.org/rist/librist/-/archive/$(LIBRIST_VERSION)/librist-$(LIBRIST_VERSION).tar.gz
+
+ifdef BUILD_NETWORK
+PKGS += librist
+endif
+
+ifeq ($(call need_pkg,"librist >= 0.2"),)
+PKGS_FOUND += librist
+endif
+
+LIBRIST_CONF = -Dbuilt_tools=false -Dtest=false
+
+$(TARBALLS)/librist-$(LIBRIST_VERSION).tar.gz:
+ $(call download_pkg,$(LIBRIST_URL),librist)
+
+.sum-librist: librist-$(LIBRIST_VERSION).tar.gz
+
+librist: librist-$(LIBRIST_VERSION).tar.gz .sum-librist
+ $(UNPACK)
+ $(MOVE)
+
+.librist: librist crossfile.meson
+ cd $< && rm -rf ./build
+ cd $< && $(HOSTVARS_MESON) $(MESON) $(LIBRIST_CONF) build
+ cd $< && cd build && ninja install
+ touch $@
=====================================
extras/tools/SHA512SUMS
=====================================
@@ -18,4 +18,4 @@ e9785f3d620a204b7d20222888917dc065c2036cae28667065bf7862dfa1b25235095a12fd04efdb
d24849b93de58b20f518c071687e7bfa653a96600382f36c4cf7fc1047656458f75f093b911b786b18b6931b2453cb60868ecbe07cc7d2984e5981a874b34942 help2man-1.47.6.tar.xz
8d23dde18525dccaa648ca01df40151e7f00cec4846bd611c8970dbcfc1fb57a453facfe4d41462e7c3c8bb548d44b961a04e4fc3073ab6b65063e53f42bf6fd nasm-2.14.tar.gz
1650bf9e3eddeb0b0fbb415c2b8e0a7c094421e991fa8139fd77fae0f6ee7ee980b7cf5e98d883c3a884f99abcb06fa26e3980af3a3a5bb6dd655124755782c2 ninja-1.8.2.tar.gz
-172b4de8c7474d709f172431b89bf2b2b1c2c38bc842039cccf6be075a45bd3509a1dab8512bc5b2ee025d65d8050d2f717dd15c1f9be17fca3b2e7da0d3e889 meson-0.55.1.tar.gz
+3b6cc5cae31d756b251ecde3483d3710bceff50cfd03ef6cf6f939d9e599998e61fcb03a2ee09d6a6f9bfa2198f43e7f20447359de3bff1055febcf03e82e514 meson-0.56.2.tar.gz
=====================================
extras/tools/packages.mak
=====================================
@@ -57,7 +57,7 @@ GETTEXT_URL=$(GNU)/gettext/gettext-$(GETTEXT_VERSION).tar.gz
HELP2MAN_VERSION=1.47.6
HELP2MAN_URL=$(GNU)/help2man/help2man-$(HELP2MAN_VERSION).tar.xz
-MESON_VERSION=0.55.1
+MESON_VERSION=0.56.2
MESON_URL=https://github.com/mesonbuild/meson/releases/download/$(MESON_VERSION)/meson-$(MESON_VERSION).tar.gz
NINJA_VERSION=1.8.2
=====================================
modules/access/Makefile.am
=====================================
@@ -449,8 +449,8 @@ EXTRA_LTLIBRARIES += libaccess_srt_plugin.la
### RIST ###
librist_plugin_la_SOURCES = access/rist.c access/rist.h
-librist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
-librist_plugin_la_LIBADD = $(SOCKET_LIBS)
-if HAVE_BITSTREAM
-access_LTLIBRARIES += librist_plugin.la
-endif
+librist_plugin_la_CFLAGS = $(AM_CFLAGS) $(RIST_CFLAGS)
+librist_plugin_la_LIBADD = $(RIST_LIBS) $(SOCKET_LIBS) $(LIBPTHREAD)
+librist_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(accessdir)'
+access_LTLIBRARIES += $(LTLIBrist)
+EXTRA_LTLIBRARIES += librist_plugin.la
=====================================
modules/access/rist.c
=====================================
@@ -1,11 +1,9 @@
/*****************************************************************************
* rist.c: RIST (Reliable Internet Stream Transport) input module
*****************************************************************************
- * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
- * Copyright (C) 2018, SipRadius LLC
+ * Copyright (C) 2021, SipRadius LLC
*
* Authors: Sergio Ammirata <sergio at ammirata.net>
- * Daniele Lacamera <root at danielinux.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
@@ -30,1024 +28,195 @@
#include <vlc_interrupt.h>
#include <vlc_plugin.h>
#include <vlc_access.h>
-#include <vlc_queue.h>
-#include <vlc_threads.h>
-#include <vlc_network.h>
#include <vlc_block.h>
-#include <vlc_url.h>
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#include <bitstream/ietf/rtcp_rr.h>
-#include <bitstream/ietf/rtcp_sdes.h>
-#include <bitstream/ietf/rtcp_fb.h>
-#include <bitstream/ietf/rtp.h>
+#define RIST_CFG_PREFIX "rist-"
#include "rist.h"
-/* The default latency is 1000 ms */
-#define RIST_DEFAULT_LATENCY 1000
-/* The default nack retry interval */
-#define RIST_DEFAULT_RETRY_INTERVAL 132
-/* The default packet re-ordering buffer */
-#define RIST_DEFAULT_REORDER_BUFFER 70
-/* The default max packet size */
-#define RIST_MAX_PACKET_SIZE 1472
-/* The default timeout is 5 ms */
-#define RIST_DEFAULT_POLL_TIMEOUT 5
-/* The max retry count for nacks */
-#define RIST_MAX_RETRIES 10
-/* The rate at which we process and send nack requests */
-#define NACK_INTERVAL 5 /*ms*/
-/* Calculate and print stats once per second */
-#define STATS_INTERVAL 1000 /*ms*/
+#define NACK_FMT_RANGE 0
+#define NACK_FMT_BITMASK 1
-static const int nack_type[] = {
- 0, 1,
+static const int nack_type_values[] = {
+ NACK_FMT_RANGE, NACK_FMT_BITMASK,
};
static const char *const nack_type_names[] = {
N_("Range"), N_("Bitmask"),
};
-enum NACK_TYPE {
- NACK_FMT_RANGE = 0,
- NACK_FMT_BITMASK
-};
-
typedef struct
{
- struct rist_flow *flow;
- char sender_name[MAX_CNAME];
- enum NACK_TYPE nack_type;
- uint64_t last_data_rx;
- uint64_t last_nack_tx;
- vlc_thread_t thread;
- int i_max_packet_size;
- int i_poll_timeout;
- int i_poll_timeout_current;
- bool b_ismulticast;
- bool b_sendnacks;
- bool b_sendblindnacks;
- bool b_disablenacks;
- bool b_flag_discontinuity;
- bool dead;
- vlc_queue_t queue;
- vlc_mutex_t lock;
- uint64_t last_message;
- uint64_t last_reset;
- /* stat variables */
- uint32_t i_poll_timeout_zero_count;
- uint32_t i_poll_timeout_nonzero_count;
- uint64_t i_last_stat;
- float vbr_ratio;
- uint16_t vbr_ratio_count;
- uint32_t i_lost_packets;
- uint32_t i_nack_packets;
- uint32_t i_recovered_packets;
- uint32_t i_reordered_packets;
- uint32_t i_total_packets;
+ struct rist_ctx *receiver_ctx;
+ int gre_filter_dst_port;
+ uint32_t cumulative_loss;
+ uint32_t flow_id;
+ bool eof;
+ int i_recovery_buffer;
+ int i_maximum_jitter;
+ struct rist_logging_settings logging_settings;
+ vlc_mutex_t lock;
+ struct rist_data_block *rist_items[RIST_MAX_QUEUE_BUFFERS];
} stream_sys_t;
-static int Control(stream_t *p_access, int i_query, va_list args)
-{
- switch( i_query )
- {
- case STREAM_CAN_SEEK:
- case STREAM_CAN_FASTSEEK:
- case STREAM_CAN_PAUSE:
- case STREAM_CAN_CONTROL_PACE:
- *va_arg( args, bool * ) = false;
- break;
-
- case STREAM_GET_PTS_DELAY:
- *va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
- var_InheritInteger(p_access, "network-caching") );
- break;
-
- default:
- return VLC_EGENERIC;
- }
-
- return VLC_SUCCESS;
-}
-
-static struct rist_flow *rist_init_rx(void)
-{
- struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
- if (!flow)
- return NULL;
-
- flow->reset = 1;
- flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
-
- if ( unlikely( flow->buffer == NULL ) )
- {
- free(flow);
- return NULL;
- }
- flow->fd_in = -1;
- flow->fd_nack = -1;
- flow->fd_rtcp_m = -1;
-
- return flow;
-}
-
-static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len,
- const struct sockaddr *peer, socklen_t slen)
-{
- vlc_mutex_lock( &lock );
- rist_WriteTo_i11e(fd, buf, len, peer, slen);
- vlc_mutex_unlock( &lock );
-}
-
-static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
- parsed_url->psz_host, parsed_url->i_port,
- parsed_url->psz_host, parsed_url->i_port+1);
-
- p_sys->flow = rist_init_rx();
- if (!p_sys->flow)
- return NULL;
-
- p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
- 0, IPPROTO_UDP);
- if (p_sys->flow->fd_in < 0)
- {
- msg_Err( p_access, "cannot open input socket" );
- goto fail;
- }
-
- if (b_ismulticast)
- {
- p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
- NULL, 0, IPPROTO_UDP);
- if (p_sys->flow->fd_rtcp_m < 0)
- {
- msg_Err( p_access, "cannot open multicast nack socket" );
- goto fail;
- }
- p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host,
- parsed_url->i_port + 1, -1, IPPROTO_UDP );
- }
- else
- {
- p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
- NULL, 0, IPPROTO_UDP);
- }
- if (p_sys->flow->fd_nack < 0)
- {
- msg_Err( p_access, "cannot open nack socket" );
- goto fail;
- }
-
- populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
- msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
-
- return p_sys->flow;
-
-fail:
- if (p_sys->flow->fd_in != -1)
- vlc_close(p_sys->flow->fd_in);
- if (p_sys->flow->fd_nack != -1)
- vlc_close(p_sys->flow->fd_nack);
- if (p_sys->flow->fd_rtcp_m != -1)
- vlc_close(p_sys->flow->fd_rtcp_m);
- free(p_sys->flow->buffer);
- free(p_sys->flow);
- return NULL;
-}
-
-static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
-{
- if (flow->ri <= flow->wi) {
- return ((idx > flow->ri) && (idx <= flow->wi));
- } else {
- return ((idx > flow->ri) || (idx <= flow->wi));
- }
-}
-
-static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
+static int cb_stats(void *arg, const struct rist_stats *stats_container)
{
+ stream_t *p_access = (stream_t*)arg;
stream_sys_t *p_sys = p_access->p_sys;
- int namelen = strlen(flow->cname) + 1;
- /* we need to make sure it is a multiple of 4, pad if necessary */
- if ((namelen - 2) & 0x3)
- namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
+ msg_Dbg(p_access, "[RIST-STATS]: %s", stats_container->stats_json);
- int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
- uint8_t *buf = malloc(rtcp_feedback_size);
- if ( unlikely( buf == NULL ) )
- return;
+ const struct rist_stats_receiver_flow *stats_receiver_flow = &stats_container->stats.receiver_flow;
- /* Populate RR */
- uint8_t *rr = buf;
- rtp_set_hdr(rr);
- rtcp_rr_set_pt(rr);
- rtcp_set_length(rr, 1);
- rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
+ p_sys->cumulative_loss += stats_receiver_flow->lost;
+ msg_Dbg(p_access, "[RIST-STATS]: received %"PRIu64", missing %"PRIu32", reordered %"PRIu32", recovered %"PRIu32", lost %"PRIu32", Q %.2f, max jitter (us) %"PRIu64", rtt %"PRIu32"ms, cumulative loss %"PRIu32"",
+ stats_receiver_flow->received,
+ stats_receiver_flow->missing,
+ stats_receiver_flow->reordered,
+ stats_receiver_flow->recovered,
+ stats_receiver_flow->lost,
+ stats_receiver_flow->quality,
+ stats_receiver_flow->max_inter_packet_spacing,
+ stats_receiver_flow->rtt,
+ p_sys->cumulative_loss
+ );
- /* Populate SDES */
- uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
- rtp_set_hdr(p_sdes);
- rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
- rtcp_sdes_set_pt(p_sdes);
- rtcp_set_length(p_sdes, (namelen >> 2) + 2);
- rtcp_sdes_set_cname(p_sdes, 1);
- rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
- uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
- strlcpy((char *)p_sdes_name, flow->cname, namelen);
-
- /* Write to Socket */
- rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size,
- (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
- free(buf);
- buf = NULL;
-}
-
-static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- struct rist_flow *flow = p_sys->flow;
- int len = 0;
-
- int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
- RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
- uint8_t *buf = malloc(bbnack_bufsize);
- if ( unlikely( buf == NULL ) )
- return;
-
- /* Populate NACKS */
- uint8_t *nack = buf;
- rtp_set_hdr(nack);
- rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
- rtcp_set_pt(nack, RTCP_PT_RTPFB);
- rtcp_set_length(nack, 2 + nack_count);
- /*uint8_t name[4] = "RIST";*/
- /*rtcp_fb_set_ssrc_media_src(nack, name);*/
- len += RTCP_FB_HEADER_SIZE;
- /* TODO : group together */
- uint16_t nacks[MAX_NACKS];
- memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
- for (int i = 0; i < nack_count; i++) {
- uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
- rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
- rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
- }
- len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
-
- /* Write to Socket */
- if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
- rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
- (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
- free(buf);
- buf = NULL;
-}
-
-static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- struct rist_flow *flow = p_sys->flow;
- int len = 0;
-
- int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
- RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
- uint8_t *buf = malloc(rbnack_bufsize);
- if ( unlikely( buf == NULL ) )
- return;
-
- /* Populate NACKS */
- uint8_t *nack = buf;
- rtp_set_hdr(nack);
- rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
- rtcp_set_pt(nack, RTCP_PT_RTPFR);
- rtcp_set_length(nack, 2 + nack_count);
- uint8_t name[4] = "RIST";
- rtcp_fb_set_ssrc_media_src(nack, name);
- len += RTCP_FB_HEADER_SIZE;
- /* TODO : group together */
- uint16_t nacks[MAX_NACKS];
- memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
- for (int i = 0; i < nack_count; i++)
+ if ((int)stats_receiver_flow->max_inter_packet_spacing > p_sys->i_recovery_buffer * 1000)
{
- uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
- rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
- rtcp_fb_nack_set_range_extra(nack_record, 0);
+ msg_Err(p_access, "The IP network jitter exceeded your recovery buffer size, %d > %d us, you should increase the recovery buffer size or fix your source/network jitter",
+ (int)stats_receiver_flow->max_inter_packet_spacing, p_sys->i_recovery_buffer *1000);
}
- len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
-
- /* Write to Socket */
- if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
- rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
- (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
- free(buf);
- buf = NULL;
-}
-static void send_nacks(stream_t *p_access, struct rist_flow *flow)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- struct rtp_pkt *pkt;
- uint16_t idx;
- uint64_t last_ts = 0;
- uint16_t null_count = 0;
- int nacks_len = 0;
- uint16_t nacks[MAX_NACKS];
-
- idx = flow->ri;
- while(idx++ != flow->wi)
- {
- pkt = &(flow->buffer[idx]);
- if (pkt->buffer == NULL)
- {
- if (nacks_len + 1 >= MAX_NACKS)
- {
- break;
- }
- else
- {
- null_count++;
- /* TODO: after adding average spacing calculation, change this formula
- to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
- uint64_t extrapolated_ts = last_ts;
- /* Find out the age and add it only if necessary */
- int retry_count = flow->nacks_retries[idx];
- uint64_t age = flow->hi_timestamp - extrapolated_ts;
- uint64_t expiration;
- if (retry_count == 0){
- expiration = flow->reorder_buffer;
- } else {
- expiration = (uint64_t)flow->nacks_retries[idx] * (uint64_t)flow->retry_interval;
- }
- if (age > expiration && retry_count <= flow->max_retries)
- {
- flow->nacks_retries[idx]++;
- nacks[nacks_len++] = idx;
- msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
- "expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000,
- flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
- }
- }
- }
- else
- {
- last_ts = pkt->rtp_ts;
- null_count = 0;
- }
- }
- if (nacks_len > 0)
+ if ((int)stats_receiver_flow->rtt > (p_sys->i_recovery_buffer))
{
- p_sys->i_nack_packets += nacks_len;
- block_t *pkt_nacks = block_Alloc(nacks_len * 2);
- if (pkt_nacks)
- {
- memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
- pkt_nacks->i_buffer = nacks_len * 2;
- vlc_queue_Enqueue(&p_sys->queue, pkt_nacks);
- }
+ msg_Err(p_access, "The RTT between us and the sender is higher than the configured recovery buffer size, %"PRIu32" > %d ms, you should increase the recovery buffer size",
+ stats_receiver_flow->rtt, p_sys->i_recovery_buffer);
}
-}
-
-static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
-{
-#define CMP(a, b) if (a != b) return a < b ? -1 : 1
- CMP(x->sa_family, y->sa_family);
- if (x->sa_family == AF_INET)
- {
- struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
- CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
- CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
- }
- else if (x->sa_family == AF_INET6)
- {
- struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
- int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr,
- sizeof(xin6->sin6_addr.s6_addr));
- if (r != 0)
- return r;
- CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
- CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
- CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
+ vlc_mutex_lock( &p_sys->lock );
+ /* Trigger the appropriate response when there is no more data */
+ /* status of 1 is no data for one buffer length */
+ /* status of 2 is no data for 60 seconds, i.e. session timeout */
+ if (p_sys->flow_id == stats_receiver_flow->flow_id && stats_receiver_flow->status == 2) {
+ p_sys->eof = true;
}
+ vlc_mutex_unlock( &p_sys->lock );
-#undef CMP
+ rist_stats_free(stats_container);
return 0;
}
-static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
-{
- if (x->sa_family == AF_INET)
- {
- struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
- msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
- inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr),
- ntohs(yin->sin_port));
- }
- else if (x->sa_family == AF_INET6)
- {
- struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
- char oldstr[INET6_ADDRSTRLEN];
- char newstr[INET6_ADDRSTRLEN];
- inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
- inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
- msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
- oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
- }
-}
-
-static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
-{
- if (x->sa_family == AF_INET)
- {
- struct sockaddr_in *xin = (void*)x;
- msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
- }
- else if (x->sa_family == AF_INET6)
- {
- struct sockaddr_in6 *xin6 = (void*)x;
- char str[INET6_ADDRSTRLEN];
- inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
- msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
- }
-}
-
-static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len,
- struct sockaddr *peer, socklen_t slen)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- uint8_t ptype;
- size_t processed_bytes = 0;
- uint16_t records;
- char new_sender_name[MAX_CNAME];
- uint8_t *buf;
-
- while (processed_bytes < len) {
- buf = buf_in + processed_bytes;
- /* safety checks */
- size_t bytes_left = len - processed_bytes + 1;
- if ( bytes_left < 4 )
- {
- /* we must have at least 4 bytes */
- msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %zu",
- bytes_left);
- return;
- }
- else if (!rtp_check_hdr(buf))
- {
- /* check for a valid rtp header */
- msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
- return;
- }
-
- ptype = rtcp_get_pt(buf);
- records = rtcp_get_length(buf);
- uint16_t bytes = (uint16_t)(4 * (1 + records));
- if (bytes > bytes_left)
- {
- /* check for a sane number of bytes */
- msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
- "packet, got a buffer of %zu bytes.", rtcp_get_length(buf), bytes, bytes_left);
- return;
- }
-
- switch(ptype) {
- case RTCP_PT_RTPFR:
- case RTCP_PT_RTPFB:
- break;
-
- case RTCP_PT_RR:
- break;
-
- case RTCP_PT_SDES:
- {
- if (p_sys->b_sendnacks == false)
- p_sys->b_sendnacks = true;
- if (p_sys->b_ismulticast)
- return;
- /* Check for changes in source IP address or port */
- uint8_t name_length = rtcp_sdes_get_name_length(buf);
- if (name_length > bytes_left ||
- name_length > sizeof(new_sender_name))
- {
- /* check for a sane number of bytes */
- msg_Err(p_access, "Malformed SDES packet, wrong cname len %"PRIu8", got a " \
- "buffer of %zu bytes.", name_length, bytes_left);
- return;
- }
- bool ip_port_changed = false;
- if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
- {
- ip_port_changed = true;
- if(flow->peer_socklen > 0)
- print_sockaddr_info_change(p_access,
- (struct sockaddr *)&flow->peer_sockaddr, peer);
- else
- print_sockaddr_info(p_access, peer);
- vlc_mutex_lock( &p_sys->lock );
- memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
- flow->peer_socklen = slen;
- vlc_mutex_unlock( &p_sys->lock );
- }
-
- /* Check for changes in cname */
- bool peer_name_changed = false;
- memset(new_sender_name, 0, MAX_CNAME);
- memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
- if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
- {
- peer_name_changed = true;
- if (strcmp(p_sys->sender_name, "") == 0)
- msg_Info(p_access, "Peer Name: %s", new_sender_name);
- else
- msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
- "Name: %s", p_sys->sender_name, new_sender_name);
- memset(p_sys->sender_name, 0, MAX_CNAME);
- memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
- }
-
- /* Reset the buffer as the source must have been restarted */
- if (peer_name_changed || ip_port_changed)
- {
- /* reset the buffer */
- flow->reset = 1;
- }
- }
- break;
-
- case RTCP_PT_SR:
- if (p_sys->b_sendnacks == false)
- p_sys->b_sendnacks = true;
- if (p_sys->b_ismulticast)
- return;
- break;
-
- default:
- msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
- }
- processed_bytes += (4 * (1 + records));
- }
-}
-
-static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
+static int Control(stream_t *p_access, int i_query, va_list args)
{
- stream_sys_t *p_sys = p_access->p_sys;
-
- /* safety checks */
- if ( len < RTP_HEADER_SIZE )
- {
- /* check if packet size >= rtp header size */
- msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %zu", len);
- return false;
- }
- else if (!rtp_check_hdr(buf))
- {
- /* check for a valid rtp header */
- msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
- return false;
- }
-
- uint16_t idx = rtp_get_seqnum(buf);
- uint32_t pkt_ts = rtp_get_timestamp(buf);
- bool retrasnmitted = false;
- bool success = true;
-
- if (flow->reset == 1)
- {
- msg_Info(p_access, "Traffic detected after buffer reset");
- /* First packet in the queue */
- flow->hi_timestamp = pkt_ts;
- msg_Info(p_access, "ts@%u", flow->hi_timestamp);
- flow->wi = idx;
- flow->ri = idx;
- flow->reset = 0;
- p_sys->b_flag_discontinuity = true;
- }
-
- /* Check to see if this is a retransmission or a regular packet */
- if (buf[11] & (1 << 0))
- {
- msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi,
- flow->wi-flow->ri);
- p_sys->i_recovered_packets++;
- retrasnmitted = true;
- }
- else if (flow->wi != flow->ri)
- {
- /* Reset counter to 0 on incoming holes */
- /* Regular packets only as retransmits are expected to come in out of order */
- uint16_t idxnext = (uint16_t)(flow->wi + 1);
- if (idx != idxnext)
- {
- if (idx > idxnext) {
- msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
- idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
- } else {
- p_sys->i_reordered_packets++;
- msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx,
- idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
- }
- uint16_t zero_counter = (uint16_t)(flow->wi + 1);
- while(zero_counter++ != idx) {
- flow->nacks_retries[zero_counter] = 0;
- }
- /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
- idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
- }
- }
-
- /* Always replace the existing one with the new one */
- struct rtp_pkt *pkt;
- pkt = &(flow->buffer[idx]);
- if (pkt->buffer && pkt->buffer->i_buffer > 0)
+ switch( i_query )
{
- block_Release(pkt->buffer);
- pkt->buffer = NULL;
- }
- pkt->buffer = block_Alloc(len);
- if (!pkt->buffer)
- return false;
-
- pkt->buffer->i_buffer = len;
- memcpy(pkt->buffer->p_buffer, buf, len);
- pkt->rtp_ts = pkt_ts;
- p_sys->last_data_rx = vlc_tick_now();
- /* Reset the try counter regardless of wether it was a retransmit or not */
- flow->nacks_retries[idx] = 0;
+ case STREAM_CAN_SEEK:
+ case STREAM_CAN_FASTSEEK:
+ case STREAM_CAN_PAUSE:
+ case STREAM_CAN_CONTROL_PACE:
+ *va_arg( args, bool * ) = false;
+ break;
- if (retrasnmitted)
- return success;
+ case STREAM_GET_PTS_DELAY:
+ *va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
+ var_InheritInteger(p_access, "network-caching") );
+ break;
- p_sys->i_total_packets++;
- /* Perform discontinuity checks and udpdate counters */
- if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
- {
- if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
- {
- msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
- flow->wi, pkt_ts, flow->hi_timestamp);
- flow->reset = 1;
- success = false;
- }
- else
- {
- flow->wi = idx;
- flow->hi_timestamp = pkt_ts;
- }
- }
- else if (!is_index_in_range(flow, idx))
- {
- /* incoming timestamp just jumped back in time or index is outside of scope */
- msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
- flow->wi, pkt_ts, flow->hi_timestamp);
- flow->reset = 1;
- success = false;
+ default:
+ return VLC_EGENERIC;
}
- return success;
+ return VLC_SUCCESS;
}
-static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
+static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
{
stream_sys_t *p_sys = p_access->p_sys;
block_t *pktout = NULL;
- struct rtp_pkt *pkt;
- uint16_t idx;
- if (flow->ri == flow->wi || flow->reset > 0)
- return NULL;
-
- idx = flow->ri;
- bool found_data = false;
- uint16_t loss_amount = 0;
- while(idx++ != flow->wi) {
+ struct rist_data_block *rist_buffer = NULL;
+ size_t i_total_size, i_rist_items_index;
+ int i_flags, ret;
+ *eof = false;
+ i_rist_items_index = i_flags = i_total_size = 0;
+ int i_read_timeout_ms = p_sys->i_maximum_jitter;
- pkt = &(flow->buffer[idx]);
- if (!pkt->buffer)
- {
- /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
- loss_amount++;
- /* We move ahead until we find a timestamp but we do not move the cursor.
- * None of them are guaranteed packet loss because we do not really
- * know their timestamps. They might still arrive on the next loop.
- * We can confirm the loss only if we get a valid packet in the loop below. */
+ while ((ret = rist_receiver_data_read2(p_sys->receiver_ctx, &rist_buffer, i_read_timeout_ms)) > 0)
+ {
+ if (p_sys->gre_filter_dst_port > 0 && rist_buffer->virt_dst_port != p_sys->gre_filter_dst_port) {
+ rist_receiver_data_block_free2(&rist_buffer);
continue;
}
-
- /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
- flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
- if (flow->hi_timestamp > (uint32_t)(pkt->rtp_ts + flow->rtp_latency))
- {
- /* Populate output packet now but remove rtp header from source */
- int newSize = pkt->buffer->i_buffer - RTP_HEADER_SIZE;
- pktout = block_Alloc(newSize);
- if (pktout)
- {
- pktout->i_buffer = newSize;
- memcpy(pktout->p_buffer, pkt->buffer->p_buffer + RTP_HEADER_SIZE, newSize);
- /* free the buffer and increase the read index */
- flow->ri = idx;
- /* TODO: calculate average duration using buffer average (bring from sender) */
- found_data = true;
+ i_read_timeout_ms = 0;
+
+ p_sys->rist_items[i_rist_items_index++] = rist_buffer;
+ i_total_size += rist_buffer->payload_len;
+ vlc_mutex_lock( &p_sys->lock );
+ if (p_sys->flow_id != rist_buffer->flow_id ||
+ rist_buffer->flags == RIST_DATA_FLAGS_DISCONTINUITY ||
+ rist_buffer->flags == RIST_DATA_FLAGS_FLOW_BUFFER_START) {
+ if (p_sys->flow_id != rist_buffer->flow_id) {
+ msg_Info(p_access, "New flow detected with id %"PRIu32"", rist_buffer->flow_id);
+ p_sys->flow_id = rist_buffer->flow_id;
}
- block_Release(pkt->buffer);
- pkt->buffer = NULL;
+ i_flags = BLOCK_FLAG_DISCONTINUITY;
+ vlc_mutex_unlock( &p_sys->lock );
break;
}
-
- }
-
- if (loss_amount > 0 && found_data == true)
- {
- /* Packet loss confirmed, we found valid data after the holes */
- msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
- flow->ri, flow->wi);
- p_sys->i_lost_packets += loss_amount;
- p_sys->b_flag_discontinuity = true;
- }
-
- return pktout;
-}
-
-static void *rist_thread(void *data)
-{
- stream_t *p_access = data;
- stream_sys_t *p_sys = p_access->p_sys;
- block_t *pkt_nacks;
-
- /* Process nacks every 5ms */
- /* We only ask for the relevant ones */
- while ((pkt_nacks = vlc_queue_DequeueKillable(&p_sys->queue,
- &p_sys->dead)) != NULL) {
- /* there are two bytes per nack */
- uint16_t nack_count = (uint16_t)pkt_nacks->i_buffer/2;
- switch(p_sys->nack_type) {
- case NACK_FMT_BITMASK:
- send_bbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
- break;
-
- default:
- send_rbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
- }
-
- if (nack_count > 1)
- msg_Dbg(p_access, "Sent %u NACKs !!!", nack_count);
- block_Release(pkt_nacks);
+ vlc_mutex_unlock( &p_sys->lock );
+ // Make sure we never read more than our array size
+ if (i_rist_items_index == (RIST_MAX_QUEUE_BUFFERS -1))
+ break;
}
- return NULL;
-}
-
-static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
-{
- stream_sys_t *p_sys = p_access->p_sys;
- uint64_t now;
- *eof = false;
- block_t *pktout = NULL;
- struct pollfd pfd[3];
- int ret;
- ssize_t r;
- struct sockaddr_storage peer;
- socklen_t slen = sizeof(struct sockaddr_storage);
- struct rist_flow *flow = p_sys->flow;
+ if (ret > 50)
+ msg_Dbg(p_access, "Falling behind reading rist buffer by %d packets", ret);
- if (vlc_killed())
- {
+ if (ret < 0) {
+ msg_Err(p_access, "Unrecoverable error %i while reading from rist, ending stream", ret);
*eof = true;
- return NULL;
+ goto failed_cleanup;
}
- int poll_sockets = 2;
- pfd[0].fd = flow->fd_in;
- pfd[0].events = POLLIN;
- pfd[1].fd = flow->fd_nack;
- pfd[1].events = POLLIN;
- if (p_sys->b_ismulticast)
- {
- pfd[2].fd = flow->fd_rtcp_m;
- pfd[2].events = POLLIN;
- poll_sockets++;
- }
-
- /* The protocol uses a fifo buffer with a fixed time delay.
- * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
- * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
- * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
- * and also every time we get a poll timeout. The configurable poll timeout is for controling
- * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
- * most cases. */
-
- ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current);
- if (unlikely(ret < 0))
+ if (i_total_size == 0) {
return NULL;
- else if (ret == 0)
- {
- /* Poll timeout, check the queue for the next packet that needs to be delivered */
- pktout = rist_dequeue(p_access, flow);
- /* if there is data, we need to come back faster to finish emptying it */
- if (pktout) {
- p_sys->i_poll_timeout_current = 0;
- p_sys->i_poll_timeout_zero_count++;
- } else {
- p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
- p_sys->i_poll_timeout_nonzero_count++;
- }
- }
- else
- {
-
- uint8_t *buf = malloc(p_sys->i_max_packet_size);
- if ( unlikely( buf == NULL ) )
- return NULL;
-
- /* Process rctp incoming data */
- if (pfd[1].revents & POLLIN)
- {
- r = rist_ReadFrom_i11e(flow->fd_nack, buf, p_sys->i_max_packet_size,
- (struct sockaddr *)&peer, &slen);
- if (unlikely(r == -1)) {
- msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
- }
- else {
- if (p_sys->b_ismulticast == false)
- rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
- }
- }
- if (p_sys->b_ismulticast && pfd[2].revents & POLLIN)
- {
- r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size,
- (struct sockaddr *)&peer, &slen);
- if (unlikely(r == -1)) {
- msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno));
- }
- else {
- rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
- }
- }
-
- /* Process regular incoming data */
- if (pfd[0].revents & POLLIN)
- {
- r = rist_Read_i11e(flow->fd_in, buf, p_sys->i_max_packet_size);
- if (unlikely(r == -1)) {
- msg_Err(p_access, "socket %d error: %s\n", flow->fd_in, gai_strerror(errno));
- }
- else
- {
- /* rist_input will process and queue the pkt */
- if (rist_input(p_access, flow, buf, r))
- {
- /* Check the queue for the next packet that needs to be delivered */
- pktout = rist_dequeue(p_access, flow);
- if (pktout) {
- p_sys->i_poll_timeout_current = 0;
- p_sys->i_poll_timeout_zero_count++;
- } else {
- p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
- p_sys->i_poll_timeout_nonzero_count++;
- }
- }
- }
- }
-
- free(buf);
- buf = NULL;
}
- now = vlc_tick_now();
-
- /* Process stats and print them out */
- /* We need to measure some items every 70ms */
- uint64_t interval = (now - flow->feedback_time);
- if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
- {
- if (p_sys->i_poll_timeout_nonzero_count > 0)
- {
- float ratio = (float)p_sys->i_poll_timeout_zero_count
- / (float)p_sys->i_poll_timeout_nonzero_count;
- if (ratio <= 1)
- p_sys->vbr_ratio += 1 - ratio;
- else
- p_sys->vbr_ratio += ratio - 1;
- p_sys->vbr_ratio_count++;
- /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
- p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
- p_sys->i_poll_timeout_zero_count = 0;
- p_sys->i_poll_timeout_nonzero_count = 0;
- }
- }
- /* We print out the stats once per second */
- interval = (now - p_sys->i_last_stat);
- if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
- {
- if ( p_sys->i_lost_packets > 0)
- msg_Err(p_access, "We have %d lost packets", p_sys->i_lost_packets);
- float ratio = 1;
- if (p_sys->vbr_ratio_count > 0)
- ratio = p_sys->vbr_ratio / (float)p_sys->vbr_ratio_count;
- float quality = 100;
- if (p_sys->i_total_packets > 0)
- quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
- p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
- if (quality != 100)
- msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
- "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets,
- p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets,
- p_sys->i_lost_packets, ratio, quality);
- p_sys->i_last_stat = now;
- p_sys->vbr_ratio = 0;
- p_sys->vbr_ratio_count = 0;
- p_sys->i_lost_packets = 0;
- p_sys->i_nack_packets = 0;
- p_sys->i_recovered_packets = 0;
- p_sys->i_reordered_packets = 0;
- p_sys->i_total_packets = 0;
+ // Prepare one large buffer (when we are behing in reading, otherwise it is the same size as what is being read)
+ pktout = block_Alloc(i_total_size);
+ if ( unlikely(pktout == NULL) ) {
+ goto failed_cleanup;
}
- /* Send rtcp feedback every RTCP_INTERVAL */
- interval = (now - flow->feedback_time);
- if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
- {
- /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
- VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
- send_rtcp_feedback(p_access, flow);
- flow->feedback_time = now;
- }
-
- /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
- interval = (now - p_sys->last_nack_tx);
- if ( interval > VLC_TICK_FROM_MS(NACK_INTERVAL) )
- {
- send_nacks(p_access, p_sys->flow);
- p_sys->last_nack_tx = now;
- }
-
- /* Safety check for when the input stream stalls */
- if ( p_sys->last_data_rx > 0 && now > p_sys->last_data_rx &&
- (uint64_t)(now - p_sys->last_data_rx) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) &&
- (uint64_t)(now - p_sys->last_reset) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) )
- {
- msg_Err(p_access, "No data received for %"PRId64" ms, resetting buffers",
- (int64_t)(now - p_sys->last_data_rx)/1000);
- p_sys->last_reset = now;
- flow->reset = 1;
+ size_t block_offset = 0;
+ for(size_t i = 0; i < i_rist_items_index; ++i) {
+ memcpy(pktout->p_buffer + block_offset, p_sys->rist_items[i]->payload, p_sys->rist_items[i]->payload_len);
+ block_offset += p_sys->rist_items[i]->payload_len;
+ rist_receiver_data_block_free2(& p_sys->rist_items[i]);
}
+ pktout->i_flags = i_flags;
+ return pktout;
- if (pktout)
- {
- if (p_sys->b_flag_discontinuity) {
- pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY;
- p_sys->b_flag_discontinuity = false;
+failed_cleanup:
+ if (i_total_size > 0) {
+ for (size_t i = 0; i < i_rist_items_index; i++) {
+ rist_receiver_data_block_free2(&p_sys->rist_items[i]);
}
- return pktout;
}
- else
- return NULL;
+ return NULL;
}
-static void Clean( stream_t *p_access )
-{
- stream_sys_t *p_sys = p_access->p_sys;
-
- if (p_sys->flow)
- {
- if (p_sys->flow->fd_in >= 0)
- net_Close (p_sys->flow->fd_in);
- if (p_sys->flow->fd_nack >= 0)
- net_Close (p_sys->flow->fd_nack);
- if (p_sys->flow->fd_rtcp_m >= 0)
- net_Close (p_sys->flow->fd_rtcp_m);
- for (int i=0; i<RIST_QUEUE_SIZE; i++) {
- struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
- if (pkt->buffer && pkt->buffer->i_buffer > 0) {
- block_Release(pkt->buffer);
- pkt->buffer = NULL;
- }
- }
- free(p_sys->flow->buffer);
- free(p_sys->flow);
- }
-}
static void Close(vlc_object_t *p_this)
{
- stream_t *p_access = (stream_t*)p_this;
+ stream_t *p_access = (stream_t*)p_this;
stream_sys_t *p_sys = p_access->p_sys;
-
- vlc_queue_Kill(&p_sys->queue, &p_sys->dead);
- vlc_join(p_sys->thread, NULL);
-
- Clean( p_access );
+ rist_destroy(p_sys->receiver_ctx);
}
static int Open(vlc_object_t *p_this)
{
stream_t *p_access = (stream_t*)p_this;
stream_sys_t *p_sys = NULL;
- vlc_url_t parsed_url = { 0 };
p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
if( unlikely( p_sys == NULL ) )
@@ -1057,55 +226,64 @@ static int Open(vlc_object_t *p_this)
vlc_mutex_init( &p_sys->lock );
- if ( vlc_UrlParse( &parsed_url, p_access->psz_url ) == -1 )
- {
- msg_Err( p_access, "Failed to parse input URL (%s)",
- p_access->psz_url );
- goto failed;
+ int rist_profile = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE);
+ p_sys->i_maximum_jitter = var_InheritInteger(p_access, RIST_CFG_PREFIX "maximum-jitter");
+ p_sys->gre_filter_dst_port = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT);
+ if (p_sys->gre_filter_dst_port % 2 != 0) {
+ msg_Err(p_access, "Virtual destination port must be an even number.");
+ return VLC_EGENERIC;
}
+ if (rist_profile == RIST_PROFILE_SIMPLE)
+ p_sys->gre_filter_dst_port = 0;
- /* Initialize rist flow */
- p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host);
- p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast);
- vlc_UrlClean( &parsed_url );
- if (!p_sys->flow)
- {
- msg_Err( p_access, "Failed to open rist flow (%s)",
- p_access->psz_url );
- goto failed;
+ int i_recovery_length = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_CFG_LATENCY);
+ if (i_recovery_length == 0) {
+ // Auto-configure the recovery buffer
+ i_recovery_length = 1000;//1 Second, libRIST default
}
+ p_sys->i_recovery_buffer = i_recovery_length;
- p_sys->b_flag_discontinuity = false;
- p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" );
- p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" );
- if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false)
- p_sys->b_sendnacks = true;
- else
- p_sys->b_sendnacks = false;
- p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
- p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
- p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
- p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
- p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
- p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
- if (p_sys->b_disablenacks)
- p_sys->flow->reorder_buffer = p_sys->flow->latency;
- else
- p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
- msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
+ int i_verbose_level = var_InheritInteger( p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL );
- /* Convert to rtp times */
- p_sys->flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->latency));
- p_sys->flow->retry_interval = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->retry_interval));
- p_sys->flow->reorder_buffer = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->reorder_buffer));
+ //This simply disables the global logs, which are only used by the udpsocket functions provided
+ //by libRIST. When called by the library logs are anyway generated (though perhaps less accurate).
+ //This prevents all sorts of complications wrt other rist modules running and other the like.
+ struct rist_logging_settings rist_global_logging_settings = LOGGING_SETTINGS_INITIALIZER;
+ if (rist_logging_set_global(&rist_global_logging_settings) != 0) {
+ msg_Err(p_access,"Could not set logging\n");
+ return VLC_EGENERIC;
+ }
- p_sys->dead = false;
- vlc_queue_Init(&p_sys->queue, offsetof (block_t, p_next));
+ struct rist_logging_settings *logging_settings = &p_sys->logging_settings;
+ logging_settings->log_socket = -1;
+ logging_settings->log_stream = NULL;
+ logging_settings->log_level = i_verbose_level;
+ logging_settings->log_cb = log_cb;
+ logging_settings->log_cb_arg = p_access;
- /* This extra thread is for sending feedback/nack packets even when no data comes in */
- if (vlc_clone(&p_sys->thread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
- {
- msg_Err(p_access, "Failed to create worker thread.");
+ if (rist_receiver_create(&p_sys->receiver_ctx, rist_profile, logging_settings) != 0) {
+ msg_Err(p_access, "Could not create rist receiver context");
+ return VLC_EGENERIC;
+ }
+
+ int nack_type = var_InheritInteger( p_access, RIST_CFG_PREFIX "nack-type" );
+ if (rist_receiver_nack_type_set(p_sys->receiver_ctx, nack_type)) {
+ msg_Err(p_access, "Could not set nack type");
+ goto failed;
+ }
+
+ // Enable stats data
+ if (rist_stats_callback_set(p_sys->receiver_ctx, 1000, cb_stats, (void *)p_access) == -1) {
+ msg_Err(p_access, "Could not enable stats callback");
+ goto failed;
+ }
+
+ if( !rist_add_peers((vlc_object_t *)p_access, p_sys->receiver_ctx, p_access->psz_url, 0, RIST_DEFAULT_VIRT_DST_PORT, i_recovery_length) )
+ goto failed;
+
+ /* Start the rist protocol thread */
+ if (rist_start(p_sys->receiver_ctx)) {
+ msg_Err(p_access, "Could not start rist receiver");
goto failed;
}
@@ -1115,10 +293,16 @@ static int Open(vlc_object_t *p_this)
return VLC_SUCCESS;
failed:
- Clean( p_access );
+ rist_destroy(p_sys->receiver_ctx);
+ msg_Err(p_access, "Failed to open rist module");
return VLC_EGENERIC;
}
+#define DST_PORT_TEXT N_("Virtual Destination Port Filter")
+#define DST_PORT_LONGTEXT N_( \
+ "Destination port to be used inside the reduced-mode of the main profile "\
+ "to filter incoming data. Use zero to allow all." )
+
/* Module descriptor */
vlc_module_begin ()
@@ -1127,30 +311,54 @@ vlc_module_begin ()
set_category( CAT_INPUT )
set_subcategory( SUBCAT_INPUT_ACCESS )
- add_integer( "packet-size", RIST_MAX_PACKET_SIZE,
- N_("RIST maximum packet size (bytes)"), NULL )
- add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT,
+ add_integer( RIST_CFG_PREFIX "maximum-jitter", 5,
N_("RIST demux/decode maximum jitter (default is 5ms)"),
N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
- "The lower the value, the more CPU cycles the algorithm will consume") )
- add_integer( "latency", RIST_DEFAULT_LATENCY, N_("RIST latency (ms)"), NULL )
- add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL, N_("RIST nack retry interval (ms)"),
- NULL )
- add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
- NULL )
- add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL )
- add_integer( "nack-type", NACK_FMT_RANGE,
- N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL )
- change_integer_list( nack_type, nack_type_names )
- add_bool( "disable-nacks", false, N_("Disable NACK output packets"),
- N_("Use this to disable packet recovery") )
- add_bool( "mcast-blind-nacks", false, N_("Do not check for a valid rtcp message from the encoder"),
- N_("Send nack messages even when we have not confirmed that the encoder is on our local " \
- "network.") )
-
- set_capability( "access", 0 )
- add_shortcut( "rist", "tr06" )
+ "The lower the value, the more CPU cycles the module will consume"))
+ add_integer( RIST_CFG_PREFIX "nack-type", NACK_FMT_RANGE,
+ N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL)
+ change_integer_list( nack_type_values, nack_type_names )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT, 0,
+ DST_PORT_TEXT, DST_PORT_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_PACKET_SIZE, RIST_MAX_PACKET_SIZE,
+ RIST_PACKET_SIZE_TEXT, NULL )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL2, NULL, RIST_URL2_TEXT, NULL )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL3, NULL, RIST_URL3_TEXT, NULL )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL4, NULL, RIST_URL4_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_BANDWIDTH, RIST_DEFAULT_RECOVERY_MAXBITRATE,
+ RIST_MAX_BITRATE_TEXT, RIST_MAX_BITRATE_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_RETRY_INTERVAL, RIST_DEFAULT_RECOVERY_RTT_MIN,
+ RIST_RETRY_INTERVAL_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_REORDER_BUFFER, RIST_DEFAULT_RECOVERY_REORDER_BUFFER,
+ RIST_REORDER_BUFFER_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_RETRIES, RIST_DEFAULT_MAX_RETRIES,
+ RIST_MAX_RETRIES_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL, RIST_DEFAULT_VERBOSE_LEVEL,
+ RIST_VERBOSE_LEVEL_TEXT, RIST_VERBOSE_LEVEL_LONGTEXT )
+ change_integer_list( verbose_level_type, verbose_level_type_names )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_LATENCY, 0,
+ BUFFER_TEXT, BUFFER_LONGTEXT )
+ add_string( RIST_CFG_PREFIX RIST_URL_PARAM_CNAME, NULL, RIST_CNAME_TEXT,
+ RIST_CNAME_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE, RIST_DEFAULT_PROFILE,
+ RIST_PROFILE_TEXT, RIST_PROFILE_LONGTEXT )
+ add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SECRET, "",
+ RIST_SHARED_SECRET_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_AES_TYPE, 0,
+ RIST_ENCRYPTION_TYPE_TEXT, NULL )
+ change_integer_list( rist_encryption_type, rist_encryption_type_names )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_TIMING_MODE, RIST_DEFAULT_TIMING_MODE,
+ RIST_TIMING_MODE_TEXT, NULL )
+ change_integer_list( rist_timing_mode_type, rist_timing_mode_names )
+ add_string( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_USERNAME, "",
+ RIST_SRP_USERNAME_TEXT, NULL )
+ add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_PASSWORD, "",
+ RIST_SRP_PASSWORD_TEXT, NULL )
+
+ set_capability( "access", 10 )
+ add_shortcut( "librist", "rist", "tr06" )
set_callbacks( Open, Close )
vlc_module_end ()
+
=====================================
modules/access/rist.h
=====================================
@@ -1,11 +1,9 @@
/*****************************************************************************
* rist.h: RIST (Reliable Internet Stream Transport) helper
*****************************************************************************
- * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
- * Copyright (C) 2018, SipRadius LLC
+ * Copyright (C) 2020, SipRadius LLC
*
* Authors: Sergio Ammirata <sergio at ammirata.net>
- * Daniele Lacamera <root at danielinux.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
@@ -22,346 +20,223 @@
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
-#include <stdint.h>
-#ifdef HAVE_ARPA_INET_H
-#include <arpa/inet.h>
-#endif
-#include <errno.h>
-#include <assert.h>
+#ifndef RIST_VLC_COMMON_H
+#define RIST_VLC_COMMON_H 1
-/*****************************************************************************
- * Public API
- *****************************************************************************/
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
-/* RIST */
-
-/* RTP header format (RFC 3550) */
-/*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- |V=2|P|X| CC |M| PT | sequence number |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | timestamp |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | synchronization source (SSRC) identifier |
- +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
- | contributing source (CSRC) identifiers |
- | .... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-*/
-
-#define RIST_QUEUE_SIZE 65536
-#define RTP_PKT_SIZE (1472)
-
-#define RTCP_INTERVAL 75 /*ms*/
-
-#define SEVENTY_YEARS_OFFSET (2208988800ULL)
-#define MAX_NACKS 32
-#define MAX_CNAME 128
-#define RTCP_EMPTY_RR_SIZE 8
-
-#define RTCP_PT_RTPFR 204
-
-struct rtp_pkt {
- uint32_t rtp_ts;
- struct block_t *buffer;
+#include <vlc_common.h>
+#include <librist/librist.h>
+
+/* RIST max fifo count (private RIST_DATAOUT_QUEUE_BUFFERS in librist) */
+#define RIST_MAX_QUEUE_BUFFERS (1024)
+
+/* RIST parameter names */
+#define RIST_CFG_MAX_PACKET_SIZE "packet-size"
+#define RIST_CFG_URL2 "peer-url2"
+#define RIST_CFG_URL3 "peer-url3"
+#define RIST_CFG_URL4 "peer-url4"
+#define RIST_CFG_RETRY_INTERVAL "retry-interval"
+#define RIST_CFG_MAX_RETRIES "max-retries"
+#define RIST_CFG_LATENCY "latency"
+
+static const char *rist_log_label[9] = {"DISABLED", "", "", "ERROR", "WARN", "", "INFO", "DEBUG", "SIMULATE"};
+
+/* RIST parameter descriptions */
+#define RIST_URL2_TEXT N_("Second peer URL")
+#define RIST_URL3_TEXT N_("Third peer URL")
+#define RIST_URL4_TEXT N_("Fourth peer URL")
+#define RIST_MAX_BITRATE_TEXT N_("Max bitrate in Kbps")
+#define RIST_MAX_BITRATE_LONGTEXT N_( \
+ "Use this value to guarantee that data+retries bitrate never exceeds your pipe size. " \
+ "Default value is 100000 Kbps (100 Mbps)" )
+#define RIST_RETRY_INTERVAL_TEXT N_("RIST nack minimum retry interval (ms)")
+#define RIST_REORDER_BUFFER_TEXT N_("RIST reorder buffer size (ms)")
+#define RIST_MAX_RETRIES_TEXT N_("RIST maximum retry count")
+#define BUFFER_TEXT N_("RIST retry-buffer queue size (ms)")
+#define BUFFER_LONGTEXT N_( \
+ "This must match the buffer size (latency) configured on the other side. If you " \
+ "are not sure, leave it blank and it will use 1000ms" )
+#define RIST_SHARED_SECRET_TEXT N_("Shared Secret")
+#define RIST_SHARED_SECRET_LONGTEXT N_( \
+ "This shared secret is a passphare shared between sender and receiver. The AES key " \
+ "is derived from it" )
+#define RIST_CNAME_TEXT N_("Peer cname")
+#define RIST_CNAME_LONGTEXT N_( \
+ "This name will be sent using the rist RTCP channel and uniquely identifies us" )
+#define RIST_PACKET_SIZE_TEXT N_("RIST maximum packet size (bytes)")
+#define RIST_SRP_USERNAME_TEXT N_("Username used for stream authentication")
+#define RIST_SRP_PASSWORD_TEXT N_("Password used for stream authentication")
+/* Profile selection */
+#define RIST_PROFILE_TEXT N_("Rist Profile")
+#define RIST_PROFILE_LONGTEXT N_( "Select the rist profile to use" )
+static const int rist_profile[] = { 0, 1, 2 };
+static const char *const rist_profile_names[] = {
+ N_("Simple Profile"), N_("Main Profile"), N_("Advanced Profile"),
};
-
-/* RIST NACK header format */
-/*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | SNBase low bits | Length recovery |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- |E| PT recovery | Mask |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | TS recovery |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- |N|D|type |index| Offset | NA |SNBase ext bits|
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-*/
-
-struct rist_flow {
- uint8_t reset;
- struct rtp_pkt *buffer;
- uint32_t qsize;
- uint32_t last_out;
- uint32_t ssrc;
- char cname[MAX_CNAME];
- struct sockaddr_storage peer_sockaddr;
- socklen_t peer_socklen;
- uint16_t ri, wi;
- int fd_in;
- int fd_out;
- int fd_rtcp;
- int fd_rtcp_m;
- int fd_nack;
- uint8_t nacks_retries[RIST_QUEUE_SIZE];
- uint32_t hi_timestamp;
- uint64_t feedback_time;
- uint32_t latency;
- uint32_t rtp_latency;
- uint32_t retry_interval;
- uint32_t reorder_buffer;
- uint8_t max_retries;
- uint32_t packets_count;
- uint32_t bytes_count;
+/* Encryption type */
+#define RIST_ENCRYPTION_TYPE_TEXT N_("Encryption type")
+#define RIST_DEFAULT_ENCRYPTION_TPE 128
+static const int rist_encryption_type[] = { 0, 128, 256, };
+static const char *const rist_encryption_type_names[] = {
+ N_("Disabled"), N_("AES 128 bits"), N_("AES 256 bits"),
+};
+/* Timing Mode */
+#define RIST_TIMING_MODE_TEXT N_("Timing mode")
+static const int rist_timing_mode_type[] = { 0, 1, 2, };
+static const char *const rist_timing_mode_names[] = {
+ N_("Use Source Time"), N_("Use Arrival Time"), N_("Use RTC"),
+};
+/* Verbose level */
+#define RIST_VERBOSE_LEVEL_TEXT N_("Verbose level")
+#define RIST_VERBOSE_LEVEL_LONGTEXT N_("This controls how much log data the library will output over stderr")
+static const int verbose_level_type[] = { RIST_LOG_DISABLE, RIST_LOG_ERROR, RIST_LOG_WARN, RIST_LOG_INFO, RIST_LOG_DEBUG, RIST_LOG_SIMULATE };
+static const char *const verbose_level_type_names[] = {
+ N_("Quiet"), N_("Errors"), N_("Warnings"), N_("Info"), N_("Debug"), N_("Simulate-Loss"),
};
-static inline uint16_t rtcp_fb_nack_get_range_start(const uint8_t *p_rtcp_fb_nack)
-{
- return (p_rtcp_fb_nack[0] << 8) | p_rtcp_fb_nack[1];
-}
+/* Max number of peers */
+// This will restrict the use of the library to the configured maximum packet size
+#define RIST_MAX_PACKET_SIZE (10000)
-static inline uint16_t rtcp_fb_nack_get_range_extra(const uint8_t *p_rtcp_fb_nack)
-{
- return (p_rtcp_fb_nack[2] << 8) | p_rtcp_fb_nack[3];
-}
+#define RIST_MAX_LOG_BUFFER 512
-static inline void rtcp_fb_nack_set_range_start(uint8_t *p_rtcp_fb_nack,
- uint16_t start)
-{
- p_rtcp_fb_nack[0] = (start >> 8) & 0xff;
- p_rtcp_fb_nack[1] = start & 0xff;
-}
-static inline void rtcp_fb_nack_set_range_extra(uint8_t *p_rtcp_fb_nack,
- uint16_t extra)
+static inline int log_cb(void *arg, enum rist_log_level log_level, const char *msg)
{
- p_rtcp_fb_nack[2] = (extra >> 8) & 0xff;
- p_rtcp_fb_nack[3] = extra & 0xff;
+ vlc_object_t *p_access = (vlc_object_t *)arg;
+ int label_index = 0;
+ if (log_level > 8)
+ label_index = 8;
+ else if (log_level > 0)
+ label_index = log_level;
+
+ if (log_level == RIST_LOG_ERROR)
+ msg_Err(p_access, "[RIST-%s]: %.*s", rist_log_label[label_index], (int)strlen(msg) - 1, msg);
+ else if (log_level == RIST_LOG_WARN)
+ msg_Warn(p_access, "[RIST-%s]: %.*s", rist_log_label[label_index], (int)strlen(msg) - 1, msg);
+ else if (log_level == RIST_LOG_INFO)
+ //libRIST follows Syslog log priorities and assigns INFO a low prio, VLC gives Info the highest
+ //so make it warn (debug would be too verbose)
+ msg_Warn(p_access, "[RIST-%s]: %.*s", rist_log_label[label_index], (int)strlen(msg) - 1, msg);
+ else if (log_level > RIST_LOG_DEBUG)
+ msg_Dbg(p_access, "[RIST-%s]: %.*s", rist_log_label[label_index], (int)strlen(msg) - 1, msg);
+
+ return 0;
}
-static inline void populate_cname(int fd, char *identifier)
+static inline int rist_get_max_packet_size(vlc_object_t *p_this)
{
- /* Set the CNAME Identifier as host at ip:port and fallback to hostname if needed */
- char hostname[MAX_CNAME];
- struct sockaddr_storage peer_sockaddr;
- int name_length = 0;
- socklen_t peer_socklen = 0;
- int ret_hostname = gethostname(hostname, MAX_CNAME);
- if (ret_hostname == -1)
- snprintf(hostname, MAX_CNAME, "UnknownHost");
- int ret_sockname = getsockname(fd, (struct sockaddr *)&peer_sockaddr, &peer_socklen);
- if (ret_sockname == 0)
- {
- struct sockaddr *peer = (struct sockaddr *)&peer_sockaddr;
- if (peer->sa_family == AF_INET) {
- struct sockaddr_in *xin = (void*)peer;
- name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
- inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
- if (name_length >= MAX_CNAME)
- identifier[MAX_CNAME-1] = 0;
- } else if (peer->sa_family == AF_INET6) {
- struct sockaddr_in6 *xin6 = (void*)peer;
- char str[INET6_ADDRSTRLEN];
- inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
- name_length = snprintf(identifier, MAX_CNAME, "%s@%s:%u", hostname,
- str, ntohs(xin6->sin6_port));
- if (name_length >= MAX_CNAME)
- identifier[MAX_CNAME-1] = 0;
- }
- }
- if (name_length == 0)
+ int i_max_packet_size = var_InheritInteger( p_this, RIST_CFG_PREFIX RIST_CFG_MAX_PACKET_SIZE );
+ if (i_max_packet_size > RIST_MAX_PACKET_SIZE)
{
- name_length = snprintf(identifier, MAX_CNAME, "%s", hostname);
- if (name_length >= MAX_CNAME)
- identifier[MAX_CNAME-1] = 0;
+ msg_Err(p_this, "The maximum packet size configured is bigger than what the library allows %d > %d, using %d instead",
+ i_max_packet_size, RIST_MAX_PACKET_SIZE, RIST_MAX_PACKET_SIZE);
+ i_max_packet_size = RIST_MAX_PACKET_SIZE;
}
+ return i_max_packet_size;
}
-static inline uint32_t rtp_get_ts( vlc_tick_t i_pts )
+static inline bool rist_add_peers(vlc_object_t *p_this, struct rist_ctx *ctx, char *psz_url, int i_multipeer_mode, int virt_dst_port, int i_recovery_length)
{
- unsigned i_clock_rate = 90000;
- /* This is an overflow-proof way of doing:
- * return i_pts * (int64_t)i_clock_rate / CLOCK_FREQ;
- *
- * NOTE: this plays nice with offsets because the (equivalent)
- * calculations are linear. */
- lldiv_t q = lldiv(i_pts, CLOCK_FREQ);
- return q.quot * (int64_t)i_clock_rate
- + q.rem * (int64_t)i_clock_rate / CLOCK_FREQ;
-}
-static inline vlc_tick_t ts_get_from_rtp( uint32_t i_rtp_ts )
-{
- unsigned i_clock_rate = 90000;
- return (vlc_tick_t)i_rtp_ts * (vlc_tick_t)(CLOCK_FREQ/i_clock_rate);
-}
-static inline ssize_t rist_ReadFrom_i11e(int fd, void *buf, size_t len, struct sockaddr *peer,
- socklen_t *slen)
-{
- ssize_t ret = -1;
+ int i_rist_reorder_buffer = var_InheritInteger( p_this, RIST_CFG_PREFIX RIST_URL_PARAM_REORDER_BUFFER );
+ int i_rist_retry_interval = var_InheritInteger( p_this, RIST_CFG_PREFIX RIST_CFG_RETRY_INTERVAL );
+ int i_rist_max_retries = var_InheritInteger( p_this, RIST_CFG_PREFIX RIST_CFG_MAX_RETRIES );
+ int i_rist_max_bitrate = var_InheritInteger(p_this, RIST_CFG_PREFIX RIST_URL_PARAM_BANDWIDTH);
- if (peer == NULL)
- ret = vlc_recv_i11e(fd, buf, len, 0);
- else
- ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
+ char *psz_stream_name = NULL;
+ psz_stream_name = var_InheritString( p_this, RIST_CFG_PREFIX RIST_URL_PARAM_CNAME );
- if (ret == -1)
- {
- switch (errno)
- {
- case EAGAIN:
- case EINTR:
- if (vlc_killed())
- return ret;
-
- /* retry one time */
- if (peer == NULL)
- ret = vlc_recv_i11e(fd, buf, len, 0);
- else
- ret = vlc_recvfrom_i11e(fd, buf, len, 0, peer, slen);
- default:
- break;
- }
- }
- return ret;
-}
+ char *psz_shared_secret = NULL;
+ psz_shared_secret = var_InheritString( p_this, RIST_CFG_PREFIX RIST_URL_PARAM_SECRET );
-static inline ssize_t rist_Read_i11e(int fd, void *buf, size_t len)
-{
- return rist_ReadFrom_i11e(fd, buf, len, NULL, NULL);
-}
+ int i_key_size = var_InheritInteger(p_this, RIST_CFG_PREFIX RIST_URL_PARAM_AES_TYPE);
-static inline ssize_t rist_ReadFrom(int fd, void *buf, size_t len, struct sockaddr *peer,
- socklen_t *slen)
-{
- ssize_t ret = -1;
+ char *psz_srp_username = NULL;
+ psz_srp_username = var_InheritString( p_this, RIST_CFG_PREFIX RIST_URL_PARAM_SRP_USERNAME );
+ char *psz_srp_password = NULL;
+ psz_srp_password = var_InheritString( p_this, RIST_CFG_PREFIX RIST_URL_PARAM_SRP_PASSWORD );
- if (peer == NULL)
- ret = recv(fd, buf, len, 0);
- else
- ret = recvfrom(fd, buf, len, 0, peer, slen);
+ int recovery_mode = RIST_RECOVERY_MODE_TIME;
- if (ret == -1)
- {
- switch (errno)
- {
- case EAGAIN:
- case EINTR:
- /* retry one time */
- if (peer == NULL)
- ret = recv(fd, buf, len, 0);
- else
- ret = recvfrom(fd, buf, len, 0, peer, slen);
- default:
- break;
- }
- }
- return ret;
-}
+ msg_Info( p_this, "Setting retry buffer to %d ms", i_recovery_length );
-static inline ssize_t rist_Read(int fd, void *buf, size_t len)
-{
- return rist_ReadFrom(fd, buf, len, NULL, NULL);
-}
+ char *addr[] = {
+ strdup(psz_url),
+ var_InheritString( p_this, RIST_CFG_PREFIX RIST_CFG_URL2 ),
+ var_InheritString( p_this, RIST_CFG_PREFIX RIST_CFG_URL3 ),
+ var_InheritString( p_this, RIST_CFG_PREFIX RIST_CFG_URL4 )
+ };
-static inline ssize_t rist_WriteTo_i11e(int fd, const void *buf, size_t len,
- const struct sockaddr *peer, socklen_t slen)
-{
-#ifdef _WIN32
- # define ENOBUFS WSAENOBUFS
- # define EAGAIN WSAEWOULDBLOCK
- # define EWOULDBLOCK WSAEWOULDBLOCK
-#endif
- ssize_t r = -1;
- if (slen == 0)
- r = vlc_send_i11e( fd, buf, len, 0 );
- else
- r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
- if( r == -1
- && net_errno != EAGAIN && net_errno != EWOULDBLOCK
- && net_errno != ENOBUFS && net_errno != ENOMEM )
- {
- int type;
- if (!getsockopt( fd, SOL_SOCKET, SO_TYPE,
- &type, &(socklen_t){ sizeof(type) }))
- {
- if( type == SOCK_DGRAM )
- {
- /* ICMP soft error: ignore and retry */
- if (slen == 0)
- r = vlc_send_i11e( fd, buf, len, 0 );
- else
- r = vlc_sendto_i11e( fd, buf, len, 0, peer, slen );
- }
+ bool b_peer_alive = false;
+ for (size_t i = 0; i < ARRAY_SIZE(addr); i++) {
+ if (addr[i] == NULL) {
+ continue;
+ }
+ else if (addr[i][0] == '\0') {
+ free(addr[i]);
+ continue;
}
- }
- return r;
-}
-static inline ssize_t rist_Write_i11e(int fd, const void *buf, size_t len)
-{
- return rist_WriteTo_i11e(fd, buf, len, NULL, 0);
-}
+ struct rist_peer_config app_peer_config = {
+ .version = RIST_PEER_CONFIG_VERSION,
+ .virt_dst_port = virt_dst_port,
+ .recovery_mode = recovery_mode,
+ .recovery_maxbitrate = (uint32_t)i_rist_max_bitrate,
+ .recovery_maxbitrate_return = 0,
+ .recovery_length_min = (uint32_t)i_recovery_length,
+ .recovery_length_max = (uint32_t)i_recovery_length,
+ .recovery_reorder_buffer = (uint32_t)i_rist_reorder_buffer,
+ .recovery_rtt_min = (uint32_t)i_rist_retry_interval,
+ .recovery_rtt_max = (uint32_t)10*i_rist_retry_interval,
+ .weight = (uint32_t)i_multipeer_mode,
+ .congestion_control_mode = RIST_CONGESTION_CONTROL_MODE_NORMAL,
+ .min_retries = 6,
+ .max_retries = (uint32_t)i_rist_max_retries,
+ .key_size = i_key_size
+ };
+
+ if (psz_shared_secret != NULL && psz_shared_secret[0] != '\0') {
+ strlcpy(app_peer_config.secret, psz_shared_secret, sizeof(app_peer_config.secret));
+ }
-static inline ssize_t rist_WriteTo(int fd, const void *buf, size_t len, const struct sockaddr *peer,
- socklen_t slen)
-{
-#ifdef _WIN32
- # define ENOBUFS WSAENOBUFS
- # define EAGAIN WSAEWOULDBLOCK
- # define EWOULDBLOCK WSAEWOULDBLOCK
-#endif
- ssize_t r = -1;
- if (slen == 0)
- r = send( fd, buf, len, 0 );
- else
- r = sendto( fd, buf, len, 0, peer, slen );
- if( r == -1
- && net_errno != EAGAIN && net_errno != EWOULDBLOCK
- && net_errno != ENOBUFS && net_errno != ENOMEM )
- {
- int type;
- if (!getsockopt( fd, SOL_SOCKET, SO_TYPE,
- &type, &(socklen_t){ sizeof(type) }))
+ if ( psz_stream_name != NULL && psz_stream_name[0] != '\0') {
+ strlcpy(app_peer_config.cname, psz_stream_name, sizeof(app_peer_config.cname));
+ }
+
+ // URL overrides (also cleans up the URL)
+ struct rist_peer_config *peer_config = &app_peer_config;
+ if (rist_parse_address2(addr[i], &peer_config))
{
- if( type == SOCK_DGRAM )
- {
- /* ICMP soft error: ignore and retry */
- if (slen == 0)
- r = send( fd, buf, len, 0 );
- else
- r = sendto( fd, buf, len, 0, peer, slen );
- }
+ msg_Err( p_this, "Could not parse peer options for sender #%d\n", (int)(i + 1));
+ free(addr[i]);
+ continue;
}
+
+ struct rist_peer *peer;
+ if (rist_peer_create(ctx, &peer, peer_config)) {
+ msg_Err( p_this, "Could not init rist sender #%i at %s",(int)(i + 1), addr[i] );
+ free(addr[i]);
+ continue;
+ }
+ else {
+ b_peer_alive = true;
+ }
+ free(addr[i]);
}
- return r;
-}
-static inline ssize_t rist_Write(int fd, const void *buf, size_t len)
-{
- return rist_WriteTo(fd, buf, len, NULL, 0);
-}
+ free(psz_shared_secret);
+ free(psz_stream_name);
+ free(psz_srp_username);
+ free(psz_srp_password);
-static bool is_multicast_address(char *psz_dst_server)
-{
- int ret;
- int ismulticast = 0;
-
- struct addrinfo hint = {
- .ai_socktype = SOCK_DGRAM,
- .ai_protocol = IPPROTO_UDP,
- .ai_flags = AI_NUMERICSERV | AI_IDN | AI_PASSIVE,
- }, *res;
-
- ret = vlc_getaddrinfo(psz_dst_server, 0, &hint, &res);
- if (ret) {
- return 0;
- } else if(res->ai_family == AF_INET) {
- unsigned long addr = ntohl(inet_addr(psz_dst_server));
- ismulticast = IN_MULTICAST(addr);
- } else if (res->ai_family == AF_INET6) {
- if (strlen(psz_dst_server) >= 5 && (strncmp("[ff00", psz_dst_server, 5) == 0 ||
- strncmp("[FF00", psz_dst_server, 5) == 0))
- ismulticast = 1;
- }
+ return b_peer_alive;
- freeaddrinfo(res);
+}
+
+#endif
- return ismulticast;
-}
\ No newline at end of file
=====================================
modules/access_output/Makefile.am
=====================================
@@ -41,9 +41,10 @@ access_out_LTLIBRARIES += $(LTLIBaccess_output_srt)
EXTRA_LTLIBRARIES += libaccess_output_srt_plugin.la
### RIST ###
-libaccess_output_rist_plugin_la_SOURCES = access_output/rist.c access/rist.h
-libaccess_output_rist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
-libaccess_output_rist_plugin_la_LIBADD = $(SOCKET_LIBS)
-if HAVE_BITSTREAM
-access_out_LTLIBRARIES += libaccess_output_rist_plugin.la
-endif
+
+libaccess_output_rist_plugin_la_SOURCES = access_output/rist.c
+libaccess_output_rist_plugin_la_CFLAGS = $(AM_CFLAGS) $(RIST_CFLAGS)
+libaccess_output_rist_plugin_la_LIBADD = $(RIST_LIBS) $(SOCKET_LIBS) $(LIBPTHREAD)
+libaccess_output_rist_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(access_outdir)'
+access_out_LTLIBRARIES += $(LTLIBaccess_output_rist)
+EXTRA_LTLIBRARIES += libaccess_output_rist_plugin.la
=====================================
modules/access_output/rist.c
=====================================
@@ -1,11 +1,9 @@
/*****************************************************************************
- * * rist.c: RIST (Reliable Internet Stream Transport) output module
+ * rist.c: RIST (Reliable Internet Stream Transport) output module
*****************************************************************************
- * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
- * Copyright (C) 2018, SipRadius LLC
+ * Copyright (C) 2021, SipRadius LLC
*
* Authors: Sergio Ammirata <sergio at ammirata.net>
- * Daniele Lacamera <root at danielinux.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
@@ -28,568 +26,75 @@
#include <vlc_common.h>
#include <vlc_interrupt.h>
-#include <vlc_fs.h>
#include <vlc_plugin.h>
#include <vlc_sout.h>
#include <vlc_block.h>
-#include <vlc_network.h>
-#include <vlc_queue.h>
-#include <vlc_threads.h>
#include <vlc_rand.h>
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
#include <sys/time.h>
-#ifdef HAVE_SYS_SOCKET_H
-#include <sys/socket.h>
-#endif
-#include <bitstream/ietf/rtcp_rr.h>
-#include <bitstream/ietf/rtcp_sr.h>
-#include <bitstream/ietf/rtcp_fb.h>
-#include <bitstream/ietf/rtcp_sdes.h>
-#include <bitstream/ietf/rtp.h>
+#define RIST_CFG_PREFIX "sout-rist-"
#include "../access/rist.h"
-/* Uncomment the following to introduce induced packet loss for TESTING purposes only */
-/*#define TEST_PACKET_LOSS*/
-
-/* The default target packet size */
-#define RIST_TARGET_PACKET_SIZE 1328
-/* The default caching delay for output data */
-#define DEFAULT_CACHING_DELAY 50
-/* The default buffer size in ms */
-#define DEFAULT_BUFFER_SIZE 0
-/* Calculate and print stats once per second */
-#define STATS_INTERVAL 1000 /*ms*/
-
-#define MPEG_II_TRANSPORT_STREAM (0x21)
-#define RIST_DEFAULT_PORT 1968
-
-#define SOUT_CFG_PREFIX "sout-rist-"
-
static const char *const ppsz_sout_options[] = {
- "packet-size",
- "caching",
- "buffer-size",
- "ssrc",
- "stream-name",
+ RIST_CFG_MAX_PACKET_SIZE,
+ RIST_URL_PARAM_VIRT_SRC_PORT,
+ RIST_URL_PARAM_VIRT_DST_PORT,
+ RIST_CFG_LATENCY,
+ "multipeer-mode",
+ RIST_CFG_URL2,
+ RIST_CFG_URL3,
+ RIST_CFG_URL4,
+ RIST_URL_PARAM_BANDWIDTH,
+ RIST_CFG_RETRY_INTERVAL,
+ RIST_URL_PARAM_REORDER_BUFFER,
+ RIST_CFG_MAX_RETRIES,
+ RIST_URL_PARAM_VERBOSE_LEVEL,
+ RIST_URL_PARAM_CNAME,
+ RIST_URL_PARAM_PROFILE,
+ RIST_URL_PARAM_SECRET,
+ RIST_URL_PARAM_AES_TYPE,
+ RIST_URL_PARAM_SRP_USERNAME,
+ RIST_URL_PARAM_SRP_PASSWORD,
NULL
};
typedef struct
{
- struct rist_flow *flow;
- uint16_t rtp_counter;
- char receiver_name[MAX_CNAME];
- uint64_t last_rtcp_tx;
- vlc_thread_t ristthread;
- vlc_thread_t senderthread;
- size_t i_packet_size;
- bool b_mtu_warning;
- bool b_ismulticast;
- vlc_mutex_t lock;
- vlc_mutex_t fd_lock;
- block_t *p_pktbuffer;
- uint64_t i_ticks_caching;
- uint32_t ssrc;
- bool dead;
- vlc_queue_t queue;
- /* stats variables */
- uint64_t i_last_stat;
- uint32_t i_retransmit_packets;
- uint32_t i_total_packets;
+ struct rist_ctx *sender_ctx;
+ int gre_src_port;
+ int gre_dst_port;
+ uint32_t i_recovery_buffer;
+ size_t i_max_packet_size;
+ struct rist_logging_settings logging_settings;
} sout_access_out_sys_t;
-static struct rist_flow *rist_init_tx()
-{
- struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
- if (!flow)
- return NULL;
-
- flow->reset = 1;
- flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
- if ( unlikely( flow->buffer == NULL ) )
- {
- free(flow);
- return NULL;
- }
- flow->fd_out = -1;
- flow->fd_rtcp = -1;
- flow->fd_rtcp_m = -1;
-
- return flow;
-}
-
-static struct rist_flow *rist_udp_transmitter(sout_access_out_t *p_access, char *psz_dst_server,
- int i_dst_port, bool b_ismulticast)
-{
- struct rist_flow *flow;
- flow = rist_init_tx();
- if (!flow)
- return NULL;
-
- flow->fd_out = net_ConnectDgram(p_access, psz_dst_server, i_dst_port, -1, IPPROTO_UDP );
- if (flow->fd_out < 0)
- {
- msg_Err( p_access, "cannot open output socket" );
- goto fail;
- }
-
- if (b_ismulticast) {
- flow->fd_rtcp_m = net_OpenDgram(p_access, psz_dst_server, i_dst_port + 1,
- NULL, 0, IPPROTO_UDP);
- if (flow->fd_rtcp_m < 0)
- {
- msg_Err( p_access, "cannot open multicast nack socket" );
- goto fail;
- }
- }
-
- flow->fd_rtcp = net_ConnectDgram(p_access, psz_dst_server, i_dst_port + 1, -1, IPPROTO_UDP );
- if (flow->fd_rtcp < 0)
- {
- msg_Err( p_access, "cannot open nack socket" );
- goto fail;
- }
-
- char *psz_streamname = NULL;
- psz_streamname = var_InheritString( p_access, SOUT_CFG_PREFIX "stream-name" );
- if ( psz_streamname != NULL && psz_streamname[0] != '\0')
- {
- int name_length = snprintf(flow->cname, MAX_CNAME, "%s", psz_streamname);
- if (name_length >= MAX_CNAME)
- flow->cname[MAX_CNAME-1] = 0;
- free( psz_streamname );
- }
- else
- populate_cname(flow->fd_rtcp, flow->cname);
-
- msg_Info(p_access, "our cname is %s", flow->cname);
-
- return flow;
-
-fail:
- if (flow->fd_out != -1)
- vlc_close(flow->fd_out);
- if (flow->fd_rtcp != -1)
- vlc_close(flow->fd_rtcp);
- if (flow->fd_rtcp_m != -1)
- vlc_close(flow->fd_rtcp_m);
- free(flow->buffer);
- free(flow);
- return NULL;
-}
-
-static void rist_retransmit(sout_access_out_t *p_access, struct rist_flow *flow, uint16_t seq)
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- struct rtp_pkt *pkt = &(flow->buffer[seq]);
- if (pkt->buffer == NULL)
- {
- msg_Err(p_access, "RIST recovery: missing requested packet %d, buffer not yet full", seq);
- return;
- }
-
- /* Mark SSID for retransmission (change the last bit of the ssrc to 1) */
- pkt->buffer->p_buffer[11] |= (1 << 0);
-#ifdef TEST_PACKET_LOSS
-# warning COMPILED WITH SELF INFLICTED PACKET LOSS
- if ((flow->packets_count % 14) == 0) {
- return;
- }
-#endif
- uint32_t rtp_age = flow->hi_timestamp - pkt->rtp_ts;
- uint64_t age = ts_get_from_rtp(rtp_age)/1000;
- if (flow->rtp_latency > 0 && rtp_age > flow->rtp_latency)
- {
- msg_Err(p_access, " Not Sending Nack #%d, too old (age %"PRId64" ms), current seq is:" \
- " [%d]. Perhaps you should increase the buffer-size ...", seq, age, flow->wi);
- }
- else
- {
- msg_Dbg(p_access, " Sending Nack #%d (age %"PRId64" ms), current seq is: [%d]",
- seq, age, flow->wi);
- p_sys->i_retransmit_packets++;
- vlc_mutex_lock( &p_sys->fd_lock );
- if (rist_Write(flow->fd_out, pkt->buffer->p_buffer, pkt->buffer->i_buffer)
- != (ssize_t)pkt->buffer->i_buffer) {
- msg_Err(p_access, "Error sending retransmitted packet after 2 tries ...");
- }
-
- vlc_mutex_unlock( &p_sys->fd_lock );
- }
-}
-
-static void process_nack(sout_access_out_t *p_access, uint8_t ptype, uint16_t nrecords,
- struct rist_flow *flow, uint8_t *pkt)
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- int i,j;
-
- /*msg_Info(p_access, " Nack (BbRR), %d record(s), Window: [%d:%d-->%d]", nrecords,
- flow->ri, flow->wi, flow->wi-flow->ri);*/
-
- if (ptype == RTCP_PT_RTPFR)
- {
- uint8_t pi_ssrc[4];
- rtcp_fb_get_ssrc_media_src(pkt, pi_ssrc);
- if (memcmp(pi_ssrc, "RIST", 4) != 0)
- {
- msg_Info(p_access, " Ignoring Nack with name %s", pi_ssrc);
- return; /* Ignore app-type not RIST */
- }
-
- for (i = 0; i < (nrecords-2); i++) {
- uint16_t missing;
- uint16_t additional;
- uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
- missing = rtcp_fb_nack_get_range_start(rtp_nack_record);
- additional = rtcp_fb_nack_get_range_extra(rtp_nack_record);
- /*msg_Info(p_access, " Nack (Range), %d, current seq is: [%d]", missing, flow->wi);*/
- vlc_mutex_lock( &p_sys->lock );
- rist_retransmit(p_access, flow, missing);
- for (j = 0; j < additional; j++) {
- rist_retransmit(p_access, flow, missing + j + 1);
- }
- vlc_mutex_unlock( &p_sys->lock );
- }
- }
- else if (ptype == RTCP_PT_RTPFB)
- {
- for (i = 0; i < (nrecords-2); i++) {
- uint16_t missing;
- uint16_t bitmask;
- uint8_t *rtp_nack_record = (pkt + 12 + i * 4);
- missing = rtcp_fb_nack_get_packet_id(rtp_nack_record);
- bitmask = rtcp_fb_nack_get_bitmask_lost(rtp_nack_record);
- /*msg_Info(p_access, " Nack (Bitmask), %d, current seq is: [%d]", missing, flow->wi);*/
- vlc_mutex_lock( &p_sys->lock );
- rist_retransmit(p_access, flow, missing);
- for (j = 0; j < 16; j++) {
- if ((bitmask & (1 << j)) == (1 << j)) {
- rist_retransmit(p_access, flow, missing + j + 1);
- }
- }
- vlc_mutex_unlock( &p_sys->lock );
- }
- }
- else
- {
- msg_Err(p_access, " !!! Wrong feedback. Ptype is %02x!=%02x, FMT: %02x", ptype,
- RTCP_PT_RTPFR, rtcp_fb_get_fmt(pkt));
- }
-}
-
-static void rist_rtcp_recv(sout_access_out_t *p_access, struct rist_flow *flow, uint8_t *pkt_raw,
- size_t len)
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- uint8_t *pkt = pkt_raw;
- uint8_t ptype;
- uint16_t processed_bytes = 0;
- uint16_t records;
-
- while (processed_bytes < len) {
- pkt = pkt_raw + processed_bytes;
- /* safety checks */
- uint16_t bytes_left = len - processed_bytes + 1;
- if ( bytes_left < 4 )
- {
- /* we must have at least 4 bytes */
- msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
- bytes_left);
- return;
- }
- else if (!rtp_check_hdr(pkt))
- {
- /* check for a valid rtp header */
- msg_Err(p_access, "Malformed feedback packet starting with %02x, ignoring.", pkt[0]);
- return;
- }
-
- ptype = rtcp_get_pt(pkt);
- records = rtcp_get_length(pkt);
- uint16_t bytes = (uint16_t)(4 * (1 + records));
- if (bytes > bytes_left)
- {
- /* check for a sane number of bytes */
- msg_Err(p_access, "Malformed feedback packet, wrong len %d, expecting %u bytes in the" \
- " packet, got a buffer of %u bytes. ptype = %d", rtcp_get_length(pkt), bytes,
- bytes_left, ptype);
- return;
- }
-
- switch(ptype) {
- case RTCP_PT_RTPFR:
- case RTCP_PT_RTPFB:
- process_nack(p_access, ptype, records, flow, pkt);
- break;
-
- case RTCP_PT_RR:
- /*
- if (p_sys->b_ismulticast == false)
- process_rr(f, pkt, len);
- */
- break;
-
- case RTCP_PT_SDES:
- {
- if (p_sys->b_ismulticast == false)
- {
- int8_t name_length = rtcp_sdes_get_name_length(pkt);
- if (name_length > bytes_left)
- {
- /* check for a sane number of bytes */
- msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
- "buffer of %u bytes.", name_length, bytes_left);
- return;
- }
- if (memcmp(pkt + RTCP_SDES_SIZE, p_sys->receiver_name, name_length) != 0)
- {
- memcpy(p_sys->receiver_name, pkt + RTCP_SDES_SIZE, name_length);
- msg_Info(p_access, "Receiver name: %s", p_sys->receiver_name);
- }
- }
- }
- break;
-
- case RTCP_PT_SR:
- break;
-
- default:
- msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
- }
- processed_bytes += bytes;
- }
-}
-
-static void rist_rtcp_send(sout_access_out_t *p_access)
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- struct rist_flow *flow = p_sys->flow;
- uint8_t rtcp_buf[RTCP_SR_SIZE + RTCP_SDES_SIZE + MAX_CNAME] = { };
- struct timeval tv;
- int r;
- uint64_t fractions;
- uint16_t namelen = strlen(flow->cname) + 1;
- gettimeofday(&tv, NULL);
-
- /* Populate SR for sender report */
- uint8_t *p_sr = rtcp_buf;
- rtp_set_hdr(p_sr);
- rtcp_sr_set_pt(p_sr);
- rtcp_sr_set_length(p_sr, 6);
- rtcp_fb_set_int_ssrc_pkt_sender(p_sr, p_sys->ssrc);
- rtcp_sr_set_ntp_time_msw(p_sr, tv.tv_sec + SEVENTY_YEARS_OFFSET);
- fractions = (uint64_t)tv.tv_usec;
- fractions <<= 32ULL;
- fractions /= 1000000ULL;
- rtcp_sr_set_ntp_time_lsw(p_sr, (uint32_t)fractions);
- rtcp_sr_set_rtp_time(p_sr, rtp_get_ts(vlc_tick_now()));
- vlc_mutex_lock( &p_sys->lock );
- rtcp_sr_set_packet_count(p_sr, flow->packets_count);
- rtcp_sr_set_octet_count(p_sr, flow->bytes_count);
- vlc_mutex_unlock( &p_sys->lock );
-
- /* Populate SDES for sender description */
- uint8_t *p_sdes = (rtcp_buf + RTCP_SR_SIZE);
- /* we need to make sure it is a multiple of 4, pad if necessary */
- if ((namelen - 2) & 0x3)
- namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
- rtp_set_hdr(p_sdes);
- rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
- rtcp_sdes_set_pt(p_sdes);
- rtcp_set_length(p_sdes, (namelen >> 2) + 2);
- rtcp_sdes_set_cname(p_sdes, 1);
- rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
- p_sdes += RTCP_SDES_SIZE;
- strlcpy((char *)p_sdes, flow->cname, namelen);
-
- /* Send the rtcp message */
- r = send(flow->fd_rtcp, rtcp_buf, RTCP_SR_SIZE + RTCP_SDES_SIZE + namelen, 0);
- (void)r;
-}
-
-static void *rist_thread(void *data)
-{
- sout_access_out_t *p_access = data;
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- uint64_t now;
- uint8_t pkt[RTP_PKT_SIZE];
- struct pollfd pfd[2];
- int ret;
- ssize_t r;
-
- int poll_sockets = 1;
- pfd[0].fd = p_sys->flow->fd_rtcp;
- pfd[0].events = POLLIN;
- if (p_sys->b_ismulticast)
- {
- pfd[1].fd = p_sys->flow->fd_rtcp_m;
- pfd[1].events = POLLIN;
- poll_sockets++;
- }
-
- for (;;) {
- ret = poll(pfd, poll_sockets, RTCP_INTERVAL >> 1);
- int canc = vlc_savecancel();
- if (ret > 0)
- {
- if (pfd[0].revents & POLLIN)
- {
- r = rist_Read(p_sys->flow->fd_rtcp, pkt, RTP_PKT_SIZE);
- if (r == RTP_PKT_SIZE) {
- msg_Err(p_access, "Rist RTCP messsage is too big (%zd bytes) and was probably " \
- "cut, please keep it under %d bytes", r, RTP_PKT_SIZE);
- }
- if (unlikely(r == -1)) {
- msg_Err(p_access, "socket %d error: %s\n", p_sys->flow->fd_rtcp,
- gai_strerror(errno));
- }
- else {
- rist_rtcp_recv(p_access, p_sys->flow, pkt, r);
- }
- }
- if (p_sys->b_ismulticast && (pfd[1].revents & POLLIN))
- {
- r = rist_Read(p_sys->flow->fd_rtcp_m, pkt, RTP_PKT_SIZE);
- if (r == RTP_PKT_SIZE) {
- msg_Err(p_access, "Rist RTCP messsage is too big (%zd bytes) and was " \
- "probably cut, please keep it under %d bytes", r, RTP_PKT_SIZE);
- }
- if (unlikely(r == -1)) {
- msg_Err(p_access, "mcast socket %d error: %s\n", p_sys->flow->fd_rtcp_m,
- gai_strerror(errno));
- }
- else {
- rist_rtcp_recv(p_access, p_sys->flow, pkt, r);
- }
- }
- }
-
- /* And, in any case: */
- now = vlc_tick_now();
- if ((now - p_sys->last_rtcp_tx) > VLC_TICK_FROM_MS(RTCP_INTERVAL))
- {
- rist_rtcp_send(p_access);
- p_sys->last_rtcp_tx = now;
- }
- vlc_restorecancel (canc);
- }
-
- return NULL;
-}
-
-/****************************************************************************
- * RTP send
- ****************************************************************************/
-static void* ThreadSend( void *data )
+static int cb_stats(void *arg, const struct rist_stats *stats_container)
{
- sout_access_out_t *p_access = data;
+ sout_access_out_t *p_access = (sout_access_out_t*)arg;
sout_access_out_sys_t *p_sys = p_access->p_sys;
- vlc_tick_t i_caching = p_sys->i_ticks_caching;
- struct rist_flow *flow = p_sys->flow;
- block_t *out;
- while ((out = vlc_queue_DequeueKillable(&p_sys->queue,
- &p_sys->dead)) != NULL)
+ msg_Dbg(p_access, "[RIST-STATS]: %s", stats_container->stats_json);
+
+ const struct rist_stats_sender_peer *stats_sender_peer = &stats_container->stats.sender_peer;
+ msg_Dbg(p_access, "[RIST-STATS]: name %s, id %"PRIu32", bitrate %zu, sent %"PRIu64", received %"PRIu64", retransmitted %"PRIu64", Q %.2f, rtt %"PRIu32"ms",
+ stats_sender_peer->cname,
+ stats_sender_peer->peer_id,
+ stats_sender_peer->bandwidth,
+ stats_sender_peer->sent,
+ stats_sender_peer->received,
+ stats_sender_peer->retransmitted,
+ stats_sender_peer->quality,
+ stats_sender_peer->rtt
+ );
+
+ if (stats_sender_peer->rtt > p_sys->i_recovery_buffer)
{
- ssize_t len = 0;
- uint16_t seq = 0;
- uint32_t pkt_ts = 0;
-
- vlc_tick_wait (out->i_dts + i_caching);
-
- len = out->i_buffer;
-
- seq = rtp_get_seqnum(out->p_buffer);
- pkt_ts = rtp_get_timestamp(out->p_buffer);
-
- vlc_mutex_lock( &p_sys->fd_lock );
-#ifdef TEST_PACKET_LOSS
-# warning COMPILED WITH SELF INFLICTED PACKET LOSS
- if ((seq % 14) == 0) {
- /*msg_Err(p_access, "Dropped packet with seq number %d ...", seq);*/
- }
- else
- {
- if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
- msg_Err(p_access, "Error sending data packet after 2 tries ...");
- }
- }
-#else
- if (rist_Write(flow->fd_out, out->p_buffer, len) != len) {
- msg_Err(p_access, "Error sending data packet after 2 tries ...");
- }
-#endif
- vlc_mutex_unlock( &p_sys->fd_lock );
-
- /* Insert Into Queue */
- vlc_mutex_lock( &p_sys->lock );
- /* Always replace the existing one with the new one */
- struct rtp_pkt *pkt;
- pkt = &(flow->buffer[seq]);
- if (pkt->buffer)
- {
- block_Release(pkt->buffer);
- pkt->buffer = NULL;
- }
- pkt->rtp_ts = pkt_ts;
- pkt->buffer = out;
-
- if (flow->reset == 1)
- {
- msg_Info(p_access, "Traffic detected");
- /* First packet in the queue */
- flow->reset = 0;
- }
- flow->wi = seq;
- flow->hi_timestamp = pkt_ts;
- /* Stats for RTCP feedback packets */
- flow->packets_count++;
- flow->bytes_count += len;
- flow->last_out = seq;
- vlc_mutex_unlock( &p_sys->lock );
-
- /* We print out the stats once per second */
- uint64_t now = vlc_tick_now();
- uint64_t interval = (now - p_sys->i_last_stat);
- if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
- {
- if (p_sys->i_retransmit_packets > 0)
- {
- float quality = 100;
- if (p_sys->i_total_packets > 0)
- quality = (float)100 - (float)100*(float)(p_sys->i_retransmit_packets)
- /(float)p_sys->i_total_packets;
- msg_Info(p_access, "STATS: Total %u, Retransmitted %u, Link Quality %.2f%%",
- p_sys->i_total_packets, p_sys->i_retransmit_packets, quality);
- }
- p_sys->i_last_stat = now;
- p_sys->i_retransmit_packets = 0;
- p_sys->i_total_packets = 0;
- }
- p_sys->i_total_packets++;
+ msg_Err(p_access, "The RTT between us and the receiver is higher than the configured recovery buffer size, %"PRIu32" > %"PRIu32" ms, you should increase the recovery buffer size",
+ stats_sender_peer->rtt, p_sys->i_recovery_buffer);
}
- return NULL;
-}
-static void SendtoFIFO( sout_access_out_t *p_access, block_t *buffer )
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
- uint16_t seq = p_sys->rtp_counter++;
-
- /* Set fresh rtp header data */
- uint8_t *bufhdr = buffer->p_buffer;
- rtp_set_hdr(bufhdr);
- rtp_set_type(bufhdr, MPEG_II_TRANSPORT_STREAM);
- rtp_set_seqnum(bufhdr, seq);
- rtp_set_int_ssrc(bufhdr, p_sys->ssrc);
- uint32_t pkt_ts = rtp_get_ts(buffer->i_dts);
- rtp_set_timestamp(bufhdr, pkt_ts);
-
- vlc_queue_Enqueue(&p_sys->queue, block_Duplicate(buffer));
+ rist_stats_free(stats_container);
+ return 0;
}
static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
@@ -597,56 +102,24 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
sout_access_out_sys_t *p_sys = p_access->p_sys;
int i_len = 0;
+ struct rist_data_block rist_buffer = { 0 };
+ rist_buffer.virt_src_port = p_sys->gre_src_port;
+ rist_buffer.virt_dst_port = p_sys->gre_dst_port;
+
while( p_buffer )
{
block_t *p_next;
- int i_block_split = 0;
-
- if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_packet_size )
- {
- msg_Warn( p_access, "Buffer data size (%zu) > configured packet size (%zu), you " \
- "should probably increase the configured packet size", p_buffer->i_buffer,
- p_sys->i_packet_size );
- p_sys->b_mtu_warning = true;
- }
-
- /* Temp buffer is already too large, flush */
- if( p_sys->p_pktbuffer->i_buffer + p_buffer->i_buffer > p_sys->i_packet_size )
- {
- SendtoFIFO(p_access, p_sys->p_pktbuffer);
- p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
- }
i_len += p_buffer->i_buffer;
while( p_buffer->i_buffer )
{
-
- size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_packet_size );
-
- i_block_split++;
-
- if( p_sys->p_pktbuffer->i_buffer == RTP_HEADER_SIZE )
- {
- p_sys->p_pktbuffer->i_dts = p_buffer->i_dts;
- }
-
- memcpy( p_sys->p_pktbuffer->p_buffer + p_sys->p_pktbuffer->i_buffer,
- p_buffer->p_buffer, i_write );
-
- p_sys->p_pktbuffer->i_buffer += i_write;
+ size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_max_packet_size );
+ rist_buffer.payload = p_buffer->p_buffer;
+ rist_buffer.payload_len = p_buffer->i_buffer;
+ rist_sender_data_write(p_sys->sender_ctx, &rist_buffer);
p_buffer->p_buffer += i_write;
p_buffer->i_buffer -= i_write;
-
- /* Flush if we reached the target size for the case of block size < target packet size.
- * Also flush when we are in block_split > 1 for the case when the block_size is
- * larger than the packet-size because we need to continue the inner loop */
- if( p_sys->p_pktbuffer->i_buffer == p_sys->i_packet_size || i_block_split > 1 )
- {
- SendtoFIFO(p_access, p_sys->p_pktbuffer);
- p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
- }
-
}
p_next = p_buffer->p_next;
@@ -654,10 +127,6 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
p_buffer = p_next;
}
-
- if ( i_len <= 0 ) {
- block_ChainRelease( p_buffer );
- }
return i_len;
}
@@ -681,49 +150,13 @@ static int Control( sout_access_out_t *p_access, int i_query, va_list args )
return i_ret;
}
-static void Clean( sout_access_out_t *p_access )
-{
- sout_access_out_sys_t *p_sys = p_access->p_sys;
-
- if ( p_sys->flow )
- {
- if (p_sys->flow->fd_out >= 0) {
- net_Close (p_sys->flow->fd_out);
- }
- if (p_sys->flow->fd_rtcp >= 0) {
- net_Close (p_sys->flow->fd_rtcp);
- }
- if (p_sys->flow->fd_rtcp_m >= 0) {
- net_Close (p_sys->flow->fd_rtcp_m);
- }
- for (int i=0; i<RIST_QUEUE_SIZE; i++) {
- struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
- if (pkt->buffer)
- {
- block_Release(pkt->buffer);
- pkt->buffer = NULL;
- }
- }
- free(p_sys->flow->buffer);
- free(p_sys->flow);
- }
-
- if (p_sys->p_pktbuffer)
- block_Release(p_sys->p_pktbuffer);
-}
-
static void Close( vlc_object_t * p_this )
{
sout_access_out_t *p_access = (sout_access_out_t*)p_this;
sout_access_out_sys_t *p_sys = p_access->p_sys;
- vlc_cancel(p_sys->ristthread);
- vlc_queue_Kill(&p_sys->queue, &p_sys->dead);
-
- vlc_join(p_sys->ristthread, NULL);
- vlc_join(p_sys->senderthread, NULL);
-
- Clean( p_access );
+ rist_destroy(p_sys->sender_ctx);
+ p_sys->sender_ctx = NULL;
}
static int Open( vlc_object_t *p_this )
@@ -737,78 +170,65 @@ static int Open( vlc_object_t *p_this )
|| var_Create ( p_access, "src-addr", VLC_VAR_STRING ) )
{
msg_Err( p_access, "Valid network information is required." );
- return VLC_ENOMEM;
+ return VLC_EGENERIC;
}
- config_ChainParse( p_access, SOUT_CFG_PREFIX, ppsz_sout_options, p_access->p_cfg );
+ config_ChainParse( p_access, RIST_CFG_PREFIX, ppsz_sout_options, p_access->p_cfg );
p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
if( unlikely( p_sys == NULL ) )
return VLC_ENOMEM;
- int i_dst_port = RIST_DEFAULT_PORT;
- char *psz_dst_addr;
- char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
- if( !psz_dst_addr )
- return VLC_ENOMEM;
+ p_access->p_sys = p_sys;
- if ( psz_parser[0] == '[' )
- psz_parser = strchr( psz_parser, ']' );
- psz_parser = strchr( psz_parser ? psz_parser : psz_dst_addr, ':' );
- if ( psz_parser != NULL )
- {
- *psz_parser++ = '\0';
- i_dst_port = atoi( psz_parser );
+ p_sys->gre_src_port = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_SRC_PORT);
+ p_sys->gre_dst_port = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT);
+ if (p_sys->gre_dst_port % 2 != 0) {
+ msg_Err( p_access, "Virtual destination port must be an even number." );
+ return VLC_EGENERIC;
}
- vlc_mutex_init( &p_sys->lock );
- vlc_mutex_init( &p_sys->fd_lock );
+ p_sys->i_max_packet_size = rist_get_max_packet_size((vlc_object_t *)p_access);
- msg_Info(p_access, "Connecting RIST output to %s:%d and %s:%d", psz_dst_addr, i_dst_port,
- psz_dst_addr, i_dst_port+1);
- p_sys->b_ismulticast = is_multicast_address(psz_dst_addr);
- struct rist_flow *flow = rist_udp_transmitter(p_access, psz_dst_addr, i_dst_port,
- p_sys->b_ismulticast);
- free (psz_dst_addr);
- if (!flow)
- goto failed;
+ int i_rist_profile = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE);
+ int i_verbose_level = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL);
- p_sys->flow = flow;
- flow->latency = var_InheritInteger(p_access, SOUT_CFG_PREFIX "buffer-size");
- flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(flow->latency));
- p_sys->ssrc = var_InheritInteger(p_access, SOUT_CFG_PREFIX "ssrc");
- if (p_sys->ssrc == 0) {
- vlc_rand_bytes(&p_sys->ssrc, 4);
+
+ //disable global logging: see comment in access/rist.c
+ struct rist_logging_settings rist_global_logging_settings = LOGGING_SETTINGS_INITIALIZER;
+ if (rist_logging_set_global(&rist_global_logging_settings) != 0) {
+ msg_Err(p_access,"Could not set logging\n");
+ return VLC_EGENERIC;
}
- /* Last bit of ssrc must be 0 for normal data and 1 for retries */
- p_sys->ssrc &= ~(1 << 0);
-
- msg_Info(p_access, "SSRC: 0x%08X", p_sys->ssrc);
- p_sys->i_ticks_caching = VLC_TICK_FROM_MS(var_InheritInteger( p_access,
- SOUT_CFG_PREFIX "caching"));
- p_sys->i_packet_size = var_InheritInteger(p_access, SOUT_CFG_PREFIX "packet-size" );
- p_sys->dead = false;
- vlc_queue_Init(&p_sys->queue, offsetof (block_t, p_next));
- p_sys->p_pktbuffer = block_Alloc( p_sys->i_packet_size );
- if( unlikely(p_sys->p_pktbuffer == NULL) )
- goto failed;
- p_sys->p_pktbuffer->i_buffer = RTP_HEADER_SIZE;
+ struct rist_logging_settings *logging_settings = &p_sys->logging_settings;
+ logging_settings->log_socket = -1;
+ logging_settings->log_stream = NULL;
+ logging_settings->log_level = i_verbose_level;
+ logging_settings->log_cb = log_cb;
+ logging_settings->log_cb_arg = p_access;
- p_access->p_sys = p_sys;
+ if (rist_sender_create(&p_sys->sender_ctx, i_rist_profile, 0, logging_settings) != 0) {
+ msg_Err( p_access, "Could not create rist sender context\n");
+ return VLC_EGENERIC;
+ }
- if( vlc_clone(&p_sys->senderthread, ThreadSend, p_access, VLC_THREAD_PRIORITY_HIGHEST ) )
- {
- msg_Err(p_access, "Failed to create sender thread.");
+ // Enable stats data
+ if (rist_stats_callback_set(p_sys->sender_ctx, 1000, cb_stats, (void *)p_access) == -1) {
+ msg_Err(p_access, "Could not enable stats callback");
goto failed;
}
- if (vlc_clone(&p_sys->ristthread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
- {
- msg_Err(p_access, "Failed to create worker thread.");
- vlc_queue_Kill(&p_sys->queue, &p_sys->dead);
- vlc_join(p_sys->senderthread, NULL);
+ int i_multipeer_mode = var_InheritInteger(p_access, RIST_CFG_PREFIX "multipeer-mode");
+ int i_recovery_length = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_CFG_LATENCY);
+ p_sys->i_recovery_buffer = i_recovery_length;
+
+ if ( !rist_add_peers((vlc_object_t *)p_access, p_sys->sender_ctx, p_access->psz_path, i_multipeer_mode, p_sys->gre_dst_port + 1, i_recovery_length) )
+ goto failed;
+
+ if (rist_start(p_sys->sender_ctx) == -1) {
+ msg_Err( p_access, "Could not start rist sender\n");
goto failed;
}
@@ -818,31 +238,32 @@ static int Open( vlc_object_t *p_this )
return VLC_SUCCESS;
failed:
- Clean( p_access );
+ rist_destroy(p_sys->sender_ctx);
+ p_sys->sender_ctx = NULL;
return VLC_EGENERIC;
}
-#define CACHING_TEXT N_("RIST data output caching size (ms)")
-#define CACHING_LONGTEXT N_( \
- "Having this cache will guarantee that the packets going out are " \
- "delivered at a spacing determined by the chain timestamps thus ensuring " \
- "a near jitter free output. Be aware that this setting will also add to " \
- "the overall latency of the stream." )
-
-#define BUFFER_TEXT N_("RIST retry-buffer queue size (ms)")
-#define BUFFER_LONGTEXT N_( \
- "This must match the buffer size (latency) configured on the server side. If you " \
- "are not sure, leave the default of 0 which will set it the maximum " \
- "value and will use about 100MB of RAM" )
-
-#define SSRC_TEXT N_("SSRC used in RTP output (default is random, i.e. 0)")
-#define SSRC_LONGTEXT N_( \
- "Use this setting to specify a known SSRC for the RTP header. This is only useful " \
- "if your receiver acts on it. When using VLC as receiver, it is not." )
-
-#define NAME_TEXT N_("Stream name")
-#define NAME_LONGTEXT N_( \
- "This Stream name will be sent to the receiver using the rist RTCP channel" )
+#define SRC_PORT_TEXT N_("Virtual Source Port")
+#define SRC_PORT_LONGTEXT N_( \
+ "Source port to be used inside the reduced-mode of the main profile" )
+
+#define DST_PORT_TEXT N_("Virtual Destination Port")
+#define DST_PORT_LONGTEXT N_( \
+ "Destination port to be used inside the reduced-mode of the main profile" )
+
+/* The default target payload size */
+#define RIST_DEFAULT_TARGET_PAYLOAD_SIZE 1316
+
+/* Multipeer mode */
+#define RIST_DEFAULT_MULTIPEER_MODE 0
+#define RIST_MULTIPEER_MODE_TEXT N_("Multipeer mode")
+#define RIST_MULTIPEER_MODE_LONGTEXT N_( \
+ "This allows you to select between duplicate or load balanced modes when " \
+ "sending data to multiple peers (several network paths)" )
+static const int multipeer_mode_type[] = { 0, 5, };
+static const char *const multipeer_mode_type_names[] = {
+ N_("Duplicate"), N_("Load balanced"),
+};
/* Module descriptor */
vlc_module_begin()
@@ -852,19 +273,49 @@ vlc_module_begin()
set_category( CAT_SOUT )
set_subcategory( SUBCAT_SOUT_ACO )
- add_integer( SOUT_CFG_PREFIX "packet-size", RIST_TARGET_PACKET_SIZE,
- N_("RIST target packet size (bytes)"), NULL )
- add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_CACHING_DELAY,
- CACHING_TEXT, CACHING_LONGTEXT )
- add_integer( SOUT_CFG_PREFIX "buffer-size", DEFAULT_BUFFER_SIZE,
+ add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_PACKET_SIZE, RIST_DEFAULT_TARGET_PAYLOAD_SIZE,
+ N_("RIST target payload size (bytes)"), NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_SRC_PORT, RIST_DEFAULT_VIRT_SRC_PORT,
+ SRC_PORT_TEXT, SRC_PORT_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VIRT_DST_PORT, RIST_DEFAULT_VIRT_DST_PORT,
+ DST_PORT_TEXT, DST_PORT_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX "multipeer-mode", RIST_DEFAULT_MULTIPEER_MODE,
+ RIST_MULTIPEER_MODE_TEXT, RIST_MULTIPEER_MODE_LONGTEXT )
+ change_integer_list( multipeer_mode_type, multipeer_mode_type_names )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL2, NULL, RIST_URL2_TEXT, NULL )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL3, NULL, RIST_URL3_TEXT, NULL )
+ add_string( RIST_CFG_PREFIX RIST_CFG_URL4, NULL, RIST_URL4_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_BANDWIDTH, RIST_DEFAULT_RECOVERY_MAXBITRATE,
+ RIST_MAX_BITRATE_TEXT, RIST_MAX_BITRATE_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_RETRY_INTERVAL, RIST_DEFAULT_RECOVERY_RTT_MIN,
+ RIST_RETRY_INTERVAL_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_REORDER_BUFFER, RIST_DEFAULT_RECOVERY_REORDER_BUFFER,
+ RIST_REORDER_BUFFER_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_MAX_RETRIES, RIST_DEFAULT_MAX_RETRIES,
+ RIST_MAX_RETRIES_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL, RIST_DEFAULT_VERBOSE_LEVEL,
+ RIST_VERBOSE_LEVEL_TEXT, RIST_VERBOSE_LEVEL_LONGTEXT )
+ change_integer_list( verbose_level_type, verbose_level_type_names )
+ add_integer( RIST_CFG_PREFIX RIST_CFG_LATENCY, RIST_DEFAULT_RECOVERY_LENGHT_MIN,
BUFFER_TEXT, BUFFER_LONGTEXT )
- add_integer( SOUT_CFG_PREFIX "ssrc", 0,
- SSRC_TEXT, SSRC_LONGTEXT )
- add_string( SOUT_CFG_PREFIX "stream-name", NULL, NAME_TEXT, NAME_LONGTEXT )
-
- set_capability( "sout access", 0 )
- add_shortcut( "rist", "tr06" )
+ add_string( RIST_CFG_PREFIX RIST_URL_PARAM_CNAME, NULL, RIST_CNAME_TEXT,
+ RIST_CNAME_LONGTEXT )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE, RIST_DEFAULT_PROFILE,
+ RIST_PROFILE_TEXT, RIST_PROFILE_LONGTEXT )
+ add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SECRET, "",
+ RIST_SHARED_SECRET_TEXT, NULL )
+ add_integer( RIST_CFG_PREFIX RIST_URL_PARAM_AES_TYPE, 0,
+ RIST_ENCRYPTION_TYPE_TEXT, NULL )
+ change_integer_list( rist_encryption_type, rist_encryption_type_names )
+ add_string( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_USERNAME, "",
+ RIST_SRP_USERNAME_TEXT, NULL )
+ add_password( RIST_CFG_PREFIX RIST_URL_PARAM_SRP_PASSWORD, "",
+ RIST_SRP_PASSWORD_TEXT, NULL )
+
+ set_capability( "sout access", 10 )
+ add_shortcut( "librist", "rist", "tr06" )
set_callbacks( Open, Close )
vlc_module_end ()
+
View it on GitLab: https://code.videolan.org/videolan/vlc/-/compare/bd8359f36ffb8ec44e8f393a28cb3a78a7c5368e...1fa33c1886f37ff50709c56c23e0a4d8a8b7b4d6
--
View it on GitLab: https://code.videolan.org/videolan/vlc/-/compare/bd8359f36ffb8ec44e8f393a28cb3a78a7c5368e...1fa33c1886f37ff50709c56c23e0a4d8a8b7b4d6
You're receiving this email because of your account on code.videolan.org.
More information about the vlc-commits
mailing list