[vlc-devel] [PATCH] Implementation of Dozer streaming protocol
Jean-Baptiste Kempf
jb at videolan.org
Mon Oct 3 22:27:24 CEST 2016
Hi,
On 30 Sep, Daniele Lacamera wrote :
> This is an implementation of a subset of the dozer network protocol
> created by Sergio Ammirata. The protocol is UDP based and the recovery
> algorithm is based on SMPTE2022. In this implementation, the
> buffer/delay size is fixed and equal to 1316 * 8 * cols * rows / bps;
> For example, for a 1Mbps stream with the default 10x5 matrix with 30%
> overhead, we would have a 500ms buffer and and an adequate recovery of
> up to 3% packet loss.
Missing NEWS entry
> +libdozer_plugin_la_SOURCES = access/dozer.c
> +libdozer_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
> +access_LTLIBRARIES += libdozer_plugin.la
Why LIBPTHREAD?
> +#ifdef HAVE_CONFIG_H
> +# include "config.h"
> +#endif
> +
> +#include <errno.h>
> +#include <vlc_common.h>
> +#include <vlc_plugin.h>
> +#include <vlc_access.h>
> +#include <vlc_network.h>
> +#include <vlc_block.h>
> +#include <vlc_interrupt.h>
> +#include <vlc_atomic.h>
> +#ifdef HAVE_POLL
> +# include <poll.h>
> +#endif
> +#include <fcntl.h>
Do you need all those headers?
> +struct __attribute__((packed)) dozer_hdr {
Use ATTR_PACKED or ATTRIBUTE_PACKED
> +#define DOZER_LONGTEXT N_("Dozer input module: This module provides an implementation " \
> + "of a subset of the dozer network protocol created by Sergio Ammirata. " \
No way we ask people to translate that...
> +struct access_sys_t
> +{
> + int fd;
> + int timeout;
> + size_t mtu;
> + size_t fifo_size;
> + block_fifo_t *fifo;
> +
> + block_t **fec_cols;
> + block_t **fec_rows;
> + block_t **matrix;
> +
> + /* Matrix size */
> + uint16_t n_rows;
> + uint16_t n_cols;
> + uint16_t n_tot;
> +
> + /* Counters/flags */
> + uint16_t fec_cur_win;
> + uint16_t fec_last_rx;
> + uint16_t fec_initialized;
> +
> + vlc_sem_t semaphore;
> + vlc_thread_t thread;
> + atomic_bool timeout_reached;
> +};
> +
Do you really need all this in the sys structure?
> +/*****************************************************************************
> + * Local prototypes
> + *****************************************************************************/
> +static block_t *BlockUDP( access_t *, bool * );
> +static int Control( access_t *, int, va_list );
> +static void* ThreadRead( void *data );
> +
> +/*****************************************************************************
> + * FEC functions
> + *****************************************************************************/
> +static void fec_xor(block_t *dst, block_t *f1, block_t *f2)
> +{
> + uint32_t len, len1, len2;
> + uint32_t i;
> +
> +
> + len1 = f1->i_buffer;
> + len2 = f2->i_buffer;
> + len = len1;
> +
> + if (len2 > len1)
> + len = len2;
> +
> + if (len < 10)
> + return;
> +
> + for(i = 10; i < len; i++) {
> + if (i >= len1) {
> + dst->p_buffer[i] = f2->p_buffer[i];
> + } else if (i >= len2) {
> + dst->p_buffer[i] = f1->p_buffer[i];
> + }
> + else dst->p_buffer[i] = f1->p_buffer[i] ^ f2->p_buffer[i];
> + }
> + dst->i_buffer = len;
> +}
Use C99 counters and remove trailing spaces.
> +/*****************************************************************************
> + * Open: open the socket
> + *****************************************************************************/
> +static int Open( vlc_object_t *p_this )
> +{
> + access_t *p_access = (access_t*)p_this;
> + access_sys_t *sys;
> +
> + if( p_access->b_preparsing )
> + return VLC_EGENERIC;
> +
> + sys = calloc( sizeof( *sys ), 1 );
> + if( unlikely( sys == NULL ) )
> + return VLC_ENOMEM;
> +
> + p_access->p_sys = sys;
> +
> + /* Set up p_access */
> + ACCESS_SET_CALLBACKS( NULL, BlockUDP, Control, NULL );
> +
> + char *psz_name = strdup( p_access->psz_location );
> + char *psz_parser;
> + const char *psz_server_addr, *psz_bind_addr = "";
> + int i_bind_port = 1234, i_server_port = 0;
> +
> + if( unlikely(psz_name == NULL) )
> + goto error;
> +
> + /* Parse psz_name syntax :
> + * [serveraddr[:serverport]][@[bindaddr]:[bindport]] */
Can't you use an already existing parser from VLC?
> + psz_parser = strchr( psz_name, '@' );
> + if( psz_parser != NULL )
> + {
> + /* Found bind address and/or bind port */
> + *psz_parser++ = '\0';
> + psz_bind_addr = psz_parser;
> +
> + if( psz_bind_addr[0] == '[' )
> + /* skips bracket'd IPv6 address */
> + psz_parser = strchr( psz_parser, ']' );
> +
> + if( psz_parser != NULL )
> + {
> + psz_parser = strchr( psz_parser, ':' );
> + if( psz_parser != NULL )
> + {
> + *psz_parser++ = '\0';
> + i_bind_port = atoi( psz_parser );
> + }
> + }
> + }
> +
> + psz_server_addr = psz_name;
> + psz_parser = ( psz_server_addr[0] == '[' )
> + ? strchr( psz_name, ']' ) /* skips bracket'd IPv6 address */
> + : psz_name;
> +
> + if( psz_parser != NULL )
> + {
> + psz_parser = strchr( psz_parser, ':' );
> + if( psz_parser != NULL )
> + {
> + *psz_parser++ = '\0';
> + i_server_port = atoi( psz_parser );
> + }
> + }
> +
> + msg_Dbg( p_access, "opening server=%s:%d local=%s:%d",
> + psz_server_addr, i_server_port, psz_bind_addr, i_bind_port );
> +
> + sys->fd = net_OpenDgram( p_access, psz_bind_addr, i_bind_port,
> + psz_server_addr, i_server_port, IPPROTO_UDP );
> + free( psz_name );
> + if( sys->fd == -1 )
> + {
> + msg_Err( p_access, "cannot open socket" );
> + goto error;
> + }
> +
> + /* Revert to blocking I/O */
> +#ifndef _WIN32
> + fcntl(sys->fd, F_SETFL, fcntl(sys->fd, F_GETFL) & ~O_NONBLOCK);
> +#else
> + ioctlsocket(sys->fd, FIONBIO, &(unsigned long){ 0 });
> +#endif
> +
> + sys->fifo = block_FifoNew();
> + if( unlikely( sys->fifo == NULL ) )
> + {
> + net_Close( sys->fd );
> + goto error;
> + }
> +
> + sys->mtu = 7 * 188 + sizeof(struct dozer_hdr);
This should be defines.
> + sys->fifo_size = var_CreateGetInteger( p_access, "udp-buffer");
> + vlc_sem_init( &sys->semaphore, 0 );
> +
> + sys->timeout = var_CreateGetInteger( p_access, "udp-timeout");
Why not Inherit?
> + atomic_init(&sys->timeout_reached, false);
> + if( sys->timeout > 0)
> + sys->timeout *= 1000;
> +
> + if( vlc_clone( &sys->thread, ThreadRead, p_access,
> + VLC_THREAD_PRIORITY_INPUT ) )
> + {
> + vlc_sem_destroy( &sys->semaphore );
> + block_FifoRelease( sys->fifo );
> + net_Close( sys->fd );
> +error:
> + free( sys );
> + return VLC_EGENERIC;
> + }
> +
> + return VLC_SUCCESS;
> +}
> +
> +/*****************************************************************************
> + * Close: free unused data structures
> + *****************************************************************************/
> +static void Close( vlc_object_t *p_this )
> +{
> + access_t *p_access = (access_t*)p_this;
> + access_sys_t *sys = p_access->p_sys;
> +
> + vlc_cancel( sys->thread );
> + vlc_join( sys->thread, NULL );
> + vlc_sem_destroy( &sys->semaphore );
> + block_FifoRelease( sys->fifo );
> + net_Close( sys->fd );
> + free( sys );
> +}
> +
> +/*****************************************************************************
> + * Control:
> + *****************************************************************************/
> +static int Control( access_t *p_access, int i_query, va_list args )
> +{
> + bool *pb_bool;
> + int64_t *pi_64;
> +
> + switch( i_query )
> + {
> + case STREAM_CAN_SEEK:
> + case STREAM_CAN_FASTSEEK:
> + case STREAM_CAN_PAUSE:
> + case STREAM_CAN_CONTROL_PACE:
> + pb_bool = (bool*)va_arg( args, bool* );
> + *pb_bool = false;
> + break;
> +
> + case STREAM_GET_PTS_DELAY:
> + pi_64 = (int64_t*)va_arg( args, int64_t * );
> + *pi_64 = INT64_C(1000)
> + * var_InheritInteger(p_access, "network-caching");
> + break;
> +
> + default:
> + return VLC_EGENERIC;
> + }
> + return VLC_SUCCESS;
> +}
> +
> +
> +/*****************************************************************************
> + * BlockUDP:
> + *****************************************************************************/
> +static block_t *BlockUDP( access_t *p_access, bool *restrict eof )
> +{
> + access_sys_t *sys = p_access->p_sys;
> + block_t *block;
> + if (atomic_load(&sys->timeout_reached)) {
> + *eof = true;
> + return NULL;
> + }
> + vlc_sem_wait_i11e(&sys->semaphore);
> + vlc_fifo_Lock(sys->fifo);
> + block = vlc_fifo_DequeueUnlocked(sys->fifo);
> + vlc_fifo_Unlock(sys->fifo);
> + return block;
> +}
> +
> +static void fifo_enqueue(access_t *access, block_t *pkt)
> +{
> + access_sys_t *sys = access->p_sys;
> + /* Discard old buffers on overflow */
> + while (vlc_fifo_GetBytes(sys->fifo) + pkt->i_buffer > sys->fifo_size)
> + {
> + int canc = vlc_savecancel();
> + block_Release(vlc_fifo_DequeueUnlocked(sys->fifo));
> + vlc_restorecancel(canc);
> + }
> + pkt->p_buffer += sizeof(struct dozer_hdr);
> + pkt->i_buffer -= sizeof(struct dozer_hdr);
> + vlc_fifo_QueueUnlocked(sys->fifo, pkt);
> +}
> +
> +static int fec_found_and_recovered(access_t *access, block_t *new, uint32_t new_idx, uint16_t win)
> +{
> + struct dozer_hdr *new_hdr = (struct dozer_hdr *)new->p_buffer;
> + access_sys_t *sys = access->p_sys;
> +
> + new_hdr->type = FEC_TYPE_DATA;
> + new_hdr->seq = htons(new_idx);
> + new_hdr->win = htons(win);
> + new_hdr->count = 0;
> +
> + if (htons(new_hdr->size) != (new->i_buffer - sizeof(struct dozer_hdr))) {
> + msg_Err(access, "Dozer Recovery FAILED. Invalid size %u for recovered packet.", ntohs(new_hdr->size));
> + return 0;
> + }
> + msg_Dbg(access, "Dozer Packet Recovered. Cur win: %u idx: %u buffer size: %lu size stored: %u", ntohs(win), new_idx, new->i_buffer, ntohs(new_hdr->size));
Please wrap your lines.
> + sys->matrix[new_idx] = new;
> +
> + if ((new_idx == sys->fec_last_rx + 1U)) {
> + sys->fec_last_rx++;
> + }
> +
> + while (sys->fec_last_rx < (sys->n_tot - 1) && (sys->matrix[sys->fec_last_rx + 1]) ) {
> + sys->fec_last_rx++;
> + }
> + return 1;
> +}
> +
> +static void fec_try_recovery(access_t *access, block_t *pkt)
> +{
> + access_sys_t *sys = access->p_sys;
> + struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
> + uint16_t pkt_win = ntohs(hdr->win);
> + uint16_t pkt_seq = ntohs(hdr->seq);
> + uint16_t pkt_type = ntohs(hdr->type);
> + uint32_t i;
> + int idx;
> + int found = 0;
> + int rec_r = -1, rec_c = -1;
> + int new_idx = -1;
> + block_t *new = NULL;
> +
> + if (pkt_win != sys->fec_cur_win) {
> + block_Release(pkt);
> + return;
> + }
> + if (pkt_type == FEC_TYPE_FEC_COL) {
> + sys->fec_cols[pkt_seq] = pkt;
> + for (i = 0; i < sys->n_rows; i++) {
> + idx = i * sys->n_cols + pkt_seq;
> + if (!sys->matrix[idx]) {
> + found++;
> + rec_r = i;
> + new_idx = idx;
> + }
> + }
> + if (found == 1) {
> + new = block_Alloc(sys->mtu);
> + memset(new->p_buffer, 0, sys->mtu);
> + new->i_buffer = 0;
> + fec_xor(new, new, pkt);
> + for (i = 0; i < sys->n_rows; i++) {
> + if (i != (uint32_t)rec_r) {
> + fec_xor(new, new, sys->matrix[i * sys->n_cols + pkt_seq]);
> + }
> + }
> + }
> + } else if (pkt_type == FEC_TYPE_FEC_ROW) {
> + sys->fec_rows[pkt_seq] = pkt;
> + for (i = 0; i < sys->n_cols; i++) {
> + idx = sys->n_cols * pkt_seq + i;
> + if (!sys->matrix[idx]) {
> + found++;
> + rec_c = i;
> + new_idx = idx;
> + }
> + }
> + if (found == 1) {
> + new = block_Alloc(sys->mtu);
> + memset(new->p_buffer, 0, sys->mtu);
> + new->i_buffer =0;
> + fec_xor(new, new, pkt);
> + for (i = 0; i < sys->n_cols; i++) {
> + if (i != (uint32_t)rec_c) {
> + fec_xor(new, new, sys->matrix[pkt_seq * sys->n_cols + i]);
> + }
> + }
> + }
> + }
> +
> + /* If found and recovered... */
> + if (new) {
> + fec_found_and_recovered(access, new, new_idx, ntohs(hdr->win));
> + }
> +}
> +
> +static void process_fec_initials(access_t *access, block_t *pkt)
> +{
> + access_sys_t *sys = access->p_sys;
> + struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
> + uint16_t pkt_type = ntohs(hdr->type);
> + uint16_t pkt_count = ntohs(hdr->count);
> +
> + if (pkt_type == FEC_TYPE_FEC_COL) {
> + sys->n_cols = pkt_count;
> + }
> + if (pkt_type == FEC_TYPE_FEC_ROW) {
> + sys->n_rows = pkt_count;
> + }
> + if ((sys->n_cols > 0) && (sys->n_rows > 0)) {
> + sys->n_tot = sys->n_cols * sys->n_rows;
> + msg_Dbg(access, "******* Initialized Dozer FEC recovery matrix %ux%u", sys->n_cols, sys->n_rows);
> + sys->matrix = calloc(sizeof(block_t*) * sys->n_tot, 1);
> + sys->fec_rows = calloc(sys->n_rows * (sizeof(block_t *)), 1);
> + sys->fec_cols = calloc(sys->n_cols * (sizeof(block_t *)), 1);
> + sys->fec_initialized = 1;
> + fec_try_recovery(access, pkt);
> + } else {
> + block_Release(pkt);
> + }
> +}
> +
> +static int recover_frame(access_t *access, uint32_t idx)
> +{
> + uint32_t i, r, c;
> + int can_recover;
> + access_sys_t *sys = access->p_sys;
> + block_t *new = NULL;
> +
> + r = idx / sys->n_cols;
> + c = idx % sys->n_cols;
> + can_recover = 0;
> +
> + if (sys->fec_cols[c] != NULL) {
> + can_recover = 1;
> + for (i = 0; i < sys->n_rows; i++) {
> + uint32_t cur = i * sys->n_cols + c;
> + if (((sys->matrix[cur]) == NULL) && (cur != idx)) {
> + /* Cannot recover: packet missing */
> + can_recover = 0;
> + break;
> + }
> + }
> + if (can_recover) {
> + block_t *new;
> + msg_Dbg(access, "Extra recovery of packet %u via column %u\n", idx, c);
> + new = block_Alloc(sys->mtu);
> + if (!new)
> + return 0;
> + memset(new->p_buffer, 0, sys->mtu);
> + new->i_buffer =0;
> + fec_xor(new, new, sys->fec_cols[c]);
> + for (i = 0; i < sys->n_rows; i++) {
> + uint32_t cur = i * sys->n_cols + c;
> + if (i != r)
> + fec_xor(new, new, sys->matrix[cur]);
> + }
> + }
> + }
> +
> + can_recover = 0;
> + if (sys->fec_rows[r] != NULL) {
> + can_recover = 1;
> + for (i = 0; i < sys->n_cols; i++) {
> + uint32_t cur = r * sys->n_cols + i;
> + if (((sys->matrix[cur]) == NULL) && (cur != idx)) {
> + /* Cannot recover: packet missing */
> + can_recover = 0;
> + break;
> + }
> + }
> + if (can_recover) {
> + block_t *new;
> + msg_Dbg(access, "Extra recovery of packet %u via row %u\n", idx, c);
> + new = block_Alloc(sys->mtu);
> + if (!new)
> + return 0;
> + memset(new->p_buffer, 0, sys->mtu);
> + new->i_buffer =0;
> + fec_xor(new, new, sys->fec_rows[r]);
> + for (i = 0; i < sys->n_cols; i++) {
> + uint32_t cur = r * sys->n_cols + i;
> + if (i != c)
> + fec_xor(new, new, sys->matrix[cur]);
> + }
> + }
> + }
> + if (new)
> + return fec_found_and_recovered(access, new, idx, sys->fec_cur_win);
> + else {
> + msg_Dbg(access, "Extra recovery of packet %u failed.\n", idx);
> + return 0;
> + }
> +}
> +
> +static void flush_win(access_t *access)
> +{
> + uint32_t i;
> + int frame_recovered;
> + access_sys_t *sys = access->p_sys;
> +
> + if (!sys->matrix)
> + return;
> + do {
> + frame_recovered = 0;
> + for (i = 0; i < sys->n_tot; i++) {
> + if (!sys->matrix[i]) {
> + frame_recovered = recover_frame(access, i);
> + }
> + }
> + } while (frame_recovered > 0);
> +
> + vlc_fifo_Lock(sys->fifo);
> + for (i = 0; i < sys->n_tot; i++) {
> + if (sys->matrix[i]) {
> + fifo_enqueue(access, sys->matrix[i]);
> + vlc_sem_post(&sys->semaphore);
> + sys->matrix[i] = NULL;
> + } else {
> + msg_Warn(access, "Warning: Dozer frame %u definitely missing!", i);
> + }
> + }
> + for (i = 0; i < sys->n_rows; i++) {
> + if (sys->fec_rows[i]) {
> + block_Release(sys->fec_rows[i]);
> + sys->fec_rows[i] = NULL;
> + }
> + }
> + for (i = 0; i < sys->n_cols; i++) {
> + if (sys->fec_cols[i]) {
> + block_Release(sys->fec_cols[i]);
> + sys->fec_cols[i] = NULL;
> + }
> + }
> + vlc_fifo_Unlock(sys->fifo);
> +}
> +
> +static int frame_to_matrix(access_t *access, block_t *pkt)
> +{
> + access_sys_t *sys = access->p_sys;
> + struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
> + uint16_t win;
> + uint16_t idx;
> +
> + idx = ntohs(hdr->seq);
> + win = ntohs(hdr->win);
> + if (sys->fec_cur_win != win) {
> + if (((uint16_t)(sys->fec_cur_win - win)) > 3) {
> + /* Next win started. */
> + flush_win(access);
> + sys->fec_cur_win = win;
> + sys->fec_last_rx = 0;
> + sys->matrix[idx] = pkt;
> + } else {
> + /* Probably old frame. Forward. */
> + msg_Dbg(access, "Received old? win: %d cur: %d", win, sys->fec_cur_win);
> + vlc_fifo_Lock(sys->fifo);
> + fifo_enqueue(access, pkt);
> + vlc_sem_post(&sys->semaphore);
> + vlc_fifo_Unlock(sys->fifo);
> + }
> + } else {
> + sys->matrix[idx] = pkt;
> + if (sys->fec_last_rx + 1 == idx) {
> + sys->fec_last_rx = idx;
> + while ((sys->fec_last_rx < (sys->n_tot - 1)) && (sys->matrix[sys->fec_last_rx + 1] != NULL)) {
> + sys->fec_last_rx++;
> + }
> + return idx;
> + }
> + }
> + return idx;
> +}
> +
> +static void process_fec(access_t *access, block_t *pkt)
> +{
> + access_sys_t *sys = access->p_sys;
> + struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
> + uint16_t pkt_type = ntohs(hdr->type);
> + uint16_t pkt_sign = ntohs(hdr->signature);
> +
> + if (pkt_sign != SIGNATURE) {
> + msg_Dbg(access, "Received invalid packet with sig: %04x. Discarding.", pkt_sign);
> + block_Release(pkt);
> + return;
> + }
> + if (pkt_type != FEC_TYPE_DATA) {
> + if (!sys->fec_initialized)
> + process_fec_initials(access, pkt);
> + else
> + fec_try_recovery(access, pkt);
> + } else {
> + if (!sys->fec_initialized)
> + block_Release(pkt);
> + else
> + frame_to_matrix(access, pkt);
> + }
> +}
> +
> +/*****************************************************************************
> + * ThreadRead: Pull packets from socket as soon as possible.
> + *****************************************************************************/
> +static void* ThreadRead( void *data )
> +{
> + access_t *access = data;
> + access_sys_t *sys = access->p_sys;
> +
> + for(;;)
> + {
> + block_t *pkt = block_Alloc(sys->mtu);
> + if (unlikely(pkt == NULL))
> + { /* OOM - dequeue and discard one packet */
> + char dummy;
> + recv(sys->fd, &dummy, 1, 0);
> + continue;
> + }
> +
> + struct iovec iov = {
> + .iov_base = pkt->p_buffer,
> + .iov_len = sys->mtu,
> + };
> + struct msghdr msg = {
> + .msg_iov = &iov,
> + .msg_iovlen = 1,
> +#ifdef __linux__
> + .msg_flags = MSG_TRUNC,
> +#endif
Why __linux here....
> + };
> + ssize_t len;
> + block_cleanup_push(pkt);
> + do
> + {
> + int poll_return=0;
> + struct pollfd ufd[1];
> + ufd[0].fd = sys->fd;
> + ufd[0].events = POLLIN;
> +
> + while ((poll_return = poll(ufd, 1, sys->timeout)) < 0); /* cancellation point */
> + if (unlikely( poll_return == 0))
> + {
> + msg_Err( access, "Timeout on receiving, timeout %d seconds", sys->timeout/1000 );
> + atomic_store(&sys->timeout_reached, true);
> + vlc_sem_post(&sys->semaphore);
> + len=0;
> + break;
> + }
> + len = recvmsg(sys->fd, &msg, 0);
> + }
> + while (len == -1);
> + vlc_cleanup_pop();
> +
> +#ifdef MSG_TRUNC
and ifdef MSG_TRUNC here?
> + if (msg.msg_flags & MSG_TRUNC)
> + {
> + msg_Err(access, "%zd bytes packet truncated (MTU was %zu)",
> + len, sys->mtu);
> + pkt->i_flags |= BLOCK_FLAG_CORRUPTED;
> + sys->mtu = len;
> + }
> + else
> +#endif
> + pkt->i_buffer = len;
> +
> + process_fec(access, pkt);
> + }
> + return NULL;
> +}
> diff --git a/modules/access_output/Makefile.am b/modules/access_output/Makefile.am
> index 7c93116..03deb12 100644
> --- a/modules/access_output/Makefile.am
> +++ b/modules/access_output/Makefile.am
> @@ -6,12 +6,15 @@ libaccess_output_file_plugin_la_LIBADD = $(LIBPTHREAD)
> libaccess_output_http_plugin_la_SOURCES = access_output/http.c
> libaccess_output_udp_plugin_la_SOURCES = access_output/udp.c
> libaccess_output_udp_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
> +libaccess_output_dozer_plugin_la_SOURCES = access_output/dozer.c
> +libaccess_output_dozer_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
Same question as above.
>
> access_out_LTLIBRARIES = \
> libaccess_output_dummy_plugin.la \
> libaccess_output_file_plugin.la \
> libaccess_output_http_plugin.la \
> - libaccess_output_udp_plugin.la
> + libaccess_output_udp_plugin.la \
> + libaccess_output_dozer_plugin.la
>
> libaccess_output_livehttp_plugin_la_SOURCES = access_output/livehttp.c
> libaccess_output_livehttp_plugin_la_CFLAGS = $(AM_CFLAGS) $(GCRYPT_CFLAGS)
> diff --git a/modules/access_output/dozer.c b/modules/access_output/dozer.c
> new file mode 100644
> index 0000000..87acda0
> --- /dev/null
> +++ b/modules/access_output/dozer.c
> @@ -0,0 +1,673 @@
> +/*****************************************************************************
> + * dozer.c
> + *****************************************************************************
> + * Copyright (C) 2016 Daniele Lacamera, Sergio Ammirata
> + * $Id$
> + *
> + * Authors: Daniele Lacamera <root at danielinux.net>
> + * Sergio Ammirata <sergio at ammirata.net>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation; either version 2.1 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program; if not, write to the Free Software Foundation,
> + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> + *****************************************************************************/
> +
> +/*****************************************************************************
> + * Preamble
> + *****************************************************************************/
> +#ifdef HAVE_CONFIG_H
> +# include "config.h"
> +#endif
> +
> +#include <vlc_common.h>
> +#include <vlc_plugin.h>
> +
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include <assert.h>
> +#include <errno.h>
> +
> +#include <vlc_sout.h>
> +#include <vlc_block.h>
> +
> +#ifdef _WIN32
> +# include <winsock2.h>
> +# include <ws2tcpip.h>
> +#else
> +# include <sys/socket.h>
> +#endif
> +
> +#include <vlc_network.h>
> +
> +#define MAX_EMPTY_BLOCKS 200
> +
> +#define SIGNATURE 0xD023
> +
> +
> +/* Fec data */
> +
> +#define DEFAULT_DOZER_FEC_ROWS 5
> +#define DEFAULT_DOZER_FEC_COLS 10
> +
> +#define FEC_TYPE_DATA 0x0000
> +#define FEC_TYPE_FEC_ROW 0x1000
> +#define FEC_TYPE_FEC_COL 0x2000
> +
> +static uint64_t i_seq = 0;
> +
> +struct __attribute__((packed)) dozer_hdr {
> + uint16_t signature;
> + uint16_t type;
> + uint16_t seq;
> + uint16_t win;
> + uint16_t count;
> + uint16_t size;
> +};
> +
> +/*****************************************************************************
> + * Module descriptor
> + *****************************************************************************/
> +static int Open ( vlc_object_t * );
> +static void Close( vlc_object_t * );
> +
> +#define SOUT_CFG_PREFIX "sout-dozer-"
> +
> +#define COLS_TEXT N_("FEC 2022 Columns")
> +#define COLS_LONGTEXT N_( \
> + "Number of Columns to use for the FEC 2022 matrix. Default is 10 cols and" \
> + " 5 rows for 30% overhead and about 3% packet loss recovery" )
> +
> +#define ROWS_TEXT N_("FEC 2022 Rows")
> +#define ROWS_LONGTEXT N_( \
> + "Number of Rows to use for the FEC 2022 matrix. Default is 5 rows and" \
> + " 10 cols for 30% overhead and about 3% packet loss recovery" )
> +
> +#define CACHING_TEXT N_("Caching value (ms)")
> +#define CACHING_LONGTEXT N_( \
> + "Default caching value for outbound UDP Dozer streams. This " \
> + "value should be set in milliseconds." )
> +
> +#define GROUP_TEXT N_("Group packets")
> +#define GROUP_LONGTEXT N_("Packets can be sent one by one at the right time " \
> + "or by groups. You can choose the number " \
> + "of packets that will be sent at a time. It " \
> + "helps reducing the scheduling load on " \
> + "heavily-loaded systems." )
> +
> +#define DOZER_LONGTEXT N_("Dozer output module: This module provides an implementation " \
> + "of a subset of the dozer network protocol created by Sergio Ammirata. " \
> + "In this implementation, the buffer/delay size is fixed and equal to " \
> + "1316 * 8 * cols * rows / bps; For example, for a 1Mbps stream with the " \
> + "default 10x5 matrix with 30% overhead, we would have a 500ms buffer and " \
> + "and an adequate recovery of up to 3% packet loss. The protocol is UDP " \
> + "based and the recovery algorithm is based on SMPTE2022." )
> +
> +vlc_module_begin ()
> + set_description( DOZER_LONGTEXT )
> + set_shortname( "DOZER" )
> + set_category( CAT_SOUT )
> + set_subcategory( SUBCAT_SOUT_ACO )
> + add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_PTS_DELAY / 1000, CACHING_TEXT, CACHING_LONGTEXT, true )
> + add_integer( SOUT_CFG_PREFIX "group", 1, GROUP_TEXT, GROUP_LONGTEXT, true )
> + add_integer( SOUT_CFG_PREFIX "rows", DEFAULT_DOZER_FEC_ROWS, ROWS_TEXT, ROWS_LONGTEXT, true)
> + add_integer( SOUT_CFG_PREFIX "cols", DEFAULT_DOZER_FEC_COLS, COLS_TEXT, COLS_LONGTEXT, true)
> + set_capability( "sout access", 0 )
> + add_shortcut( "dozer", "dozer2022" )
> + set_callbacks( Open, Close )
> +vlc_module_end ()
> +
> +/*****************************************************************************
> + * Exported prototypes
> + *****************************************************************************/
> +
> +static const char *const ppsz_sout_options[] = {
> + "caching",
> + "group",
> + "rows",
> + "cols",
> + NULL
> +};
> +
> +/* Options handled by the libvlc network core */
> +static const char *const ppsz_core_options[] = {
> + "dscp",
> + "ttl",
> + "miface",
> + NULL
> +};
> +
> +static ssize_t Write ( sout_access_out_t *, block_t * );
> +static int Seek ( sout_access_out_t *, off_t );
> +static int Control( sout_access_out_t *, int, va_list );
> +
> +static void* ThreadWrite( void * );
> +static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
> +
> +struct sout_access_out_sys_t
> +{
> + mtime_t i_caching;
> + int i_handle;
> + bool b_mtu_warning;
> + size_t i_mtu;
> +
> + block_fifo_t *p_fifo;
> + block_fifo_t *p_empty_blocks;
> + block_t *p_buffer;
> +
> + block_t **fec_rows;
> + block_t **fec_cols;
> +
> + int i_rows;
> + int i_cols;
> + int i_fecsize;
> +
> + vlc_thread_t thread;
> +};
> +
> +
> +
> +/*****************************************************************************
> + * FEC functions
> + *****************************************************************************/
> +static void fec_xor(block_t *dst, block_t *f1, block_t *f2)
> +{
> + uint32_t len, len1, len2;
> + uint32_t i;
> +
> +
> + len1 = f1->i_buffer;
> + len2 = f2->i_buffer;
> + len = len1;
> +
> + if (len2 > len1)
> + len = len2;
> +
> + if (len < 10)
> + return;
> +
> + for(i = 10; i < len; i++) {
> + if (i >= len1) {
> + dst->p_buffer[i] = f2->p_buffer[i];
> + } else if (i >= len2) {
> + dst->p_buffer[i] = f1->p_buffer[i];
> + }
> + else dst->p_buffer[i] = f1->p_buffer[i] ^ f2->p_buffer[i];
> + }
> + dst->i_buffer = len;
> +}
> +
> +#define DEFAULT_PORT 1234
> +
> +/*****************************************************************************
> + * Open: open the file
> + *****************************************************************************/
> +static int Open( vlc_object_t *p_this )
> +{
> + sout_access_out_t *p_access = (sout_access_out_t*)p_this;
> + sout_access_out_sys_t *p_sys;
> +
> + char *psz_dst_addr = NULL;
> + int i_dst_port;
> +
> + int i_handle;
> +
> + msg_Dbg( p_access, "Dozer: Called Open()");
> + config_ChainParse( p_access, SOUT_CFG_PREFIX,
> + ppsz_sout_options, p_access->p_cfg );
> + config_ChainParse( p_access, "",
> + ppsz_core_options, p_access->p_cfg );
> +
> + if (var_Create (p_access, "dst-port", VLC_VAR_INTEGER)
> + || var_Create (p_access, "src-port", VLC_VAR_INTEGER)
> + || var_Create (p_access, "dst-addr", VLC_VAR_STRING)
> + || var_Create (p_access, "src-addr", VLC_VAR_STRING))
> + {
> + return VLC_ENOMEM;
> + }
> +
> + if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
> + return VLC_ENOMEM;
> + p_access->p_sys = p_sys;
> +
> + i_dst_port = DEFAULT_PORT;
> + char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
> + if( !psz_dst_addr )
> + {
> + free( p_sys );
> + return VLC_ENOMEM;
> + }
> +
> + if (psz_parser[0] == '[')
> + psz_parser = strchr (psz_parser, ']');
> +
> + psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
> + if (psz_parser != NULL)
> + {
> + *psz_parser++ = '\0';
> + i_dst_port = atoi (psz_parser);
> + }
> +
> + i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1,
> + IPPROTO_UDP );
> + free (psz_dst_addr);
> +
> + if( i_handle == -1 )
> + {
> + msg_Err( p_access, "failed to create raw UDP socket" );
> + free (p_sys);
> + return VLC_EGENERIC;
> + }
> + else
> + {
> + char addr[NI_MAXNUMERICHOST];
> + int port;
> +
> + if (net_GetSockAddress (i_handle, addr, &port) == 0)
> + {
> + msg_Dbg (p_access, "source: %s port %d", addr, port);
> + var_SetString (p_access, "src-addr", addr);
> + var_SetInteger (p_access, "src-port", port);
> + }
> +
> + if (net_GetPeerAddress (i_handle, addr, &port) == 0)
> + {
> + msg_Dbg (p_access, "destination: %s port %d", addr, port);
> + var_SetString (p_access, "dst-addr", addr);
> + var_SetInteger (p_access, "dst-port", port);
> + }
> + }
> + shutdown( i_handle, SHUT_RD );
> +
> + p_sys->i_rows = var_GetInteger( p_access, SOUT_CFG_PREFIX "rows" );
> + p_sys->i_cols = var_GetInteger( p_access, SOUT_CFG_PREFIX "cols" );
> + p_sys->i_fecsize = p_sys->i_rows * p_sys->i_cols;
> + p_sys->i_caching = UINT64_C(1000)
> + * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching");
> + p_sys->i_handle = i_handle;
> + p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
> + p_sys->b_mtu_warning = false;
> + p_sys->p_fifo = block_FifoNew();
> + p_sys->p_empty_blocks = block_FifoNew();
> + p_sys->p_buffer = NULL;
> +
> +
> + p_sys->fec_rows = calloc(p_sys->i_rows * sizeof(block_t *), 1);
> + if (!p_sys->fec_rows) {
> + msg_Err( p_access, "cannot allocate FEC matrix" );
> + block_FifoRelease( p_sys->p_fifo );
> + block_FifoRelease( p_sys->p_empty_blocks );
> + net_Close (i_handle);
> + free (p_sys);
> + return VLC_EGENERIC;
> + }
> + p_sys->fec_cols = calloc(p_sys->i_cols * sizeof(block_t *), 1);
> + if (!p_sys->fec_cols) {
> + free(p_sys->fec_rows);
> + msg_Err( p_access, "cannot allocate FEC matrix" );
> + block_FifoRelease( p_sys->p_fifo );
> + block_FifoRelease( p_sys->p_empty_blocks );
> + net_Close (i_handle);
> + free (p_sys);
> + return VLC_EGENERIC;
> + }
> + if( vlc_clone( &p_sys->thread, ThreadWrite, p_access,
> + VLC_THREAD_PRIORITY_HIGHEST ) )
> + {
> + msg_Err( p_access, "cannot spawn sout access thread" );
> + block_FifoRelease( p_sys->p_fifo );
> + block_FifoRelease( p_sys->p_empty_blocks );
> + net_Close (i_handle);
> + free (p_sys);
> + free(p_sys->fec_rows);
> + free(p_sys->fec_cols);
> + return VLC_EGENERIC;
> + }
> +
> + p_access->pf_write = Write;
> + p_access->pf_seek = Seek;
> + p_access->pf_control = Control;
> +
> + return VLC_SUCCESS;
> +}
> +
> +/*****************************************************************************
> + * Close: close the target
> + *****************************************************************************/
> +static void Close( vlc_object_t * p_this )
> +{
> + sout_access_out_t *p_access = (sout_access_out_t*)p_this;
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> +
> + vlc_cancel( p_sys->thread );
> + vlc_join( p_sys->thread, NULL );
> + block_FifoRelease( p_sys->p_fifo );
> + block_FifoRelease( p_sys->p_empty_blocks );
> +
> + if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
> +
> + net_Close( p_sys->i_handle );
> + free( p_sys );
> +}
> +
> +static int Control( sout_access_out_t *p_access, int i_query, va_list args )
> +{
> + (void)p_access;
> +
> + switch( i_query )
> + {
> + case ACCESS_OUT_CONTROLS_PACE:
> + *va_arg( args, bool * ) = false;
> + break;
> +
> + default:
> + return VLC_EGENERIC;
> + }
> + return VLC_SUCCESS;
> +}
> +
> +/*****************************************************************************
> + * Write: standard write on a file descriptor.
> + *****************************************************************************/
> +
> +static void reset_fec(sout_access_out_t *p_access)
> +{
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + int i;
> + mtime_t now = mdate();
> + for (i = 0; i < p_sys->i_rows; i++) {
> + p_access->p_sys->fec_rows[i] = NewUDPPacket(p_access, now);
> + }
> + for (i = 0; i < p_sys->i_cols; i++) {
> + p_access->p_sys->fec_cols[i] = NewUDPPacket(p_access, now);
> + }
> +}
> +
> +static block_t *process_fec_row(sout_access_out_t *p_access, block_t *p_buffer, uint64_t seq)
> +{
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + uint16_t win = seq / p_sys->i_fecsize;
> + uint16_t iseq = seq % p_sys->i_fecsize;
> + uint16_t row = iseq / p_sys->i_cols;
> + uint16_t col = iseq % p_sys->i_cols;
> +
> + if (iseq == 0) {
> + reset_fec(p_access);
> + }
> +
> + /* Update row fec */
> + fec_xor(p_access->p_sys->fec_rows[row], p_access->p_sys->fec_rows[row], p_buffer);
> + /* Last in row? Enqueue. */
> + if (col == (p_sys->i_cols - 1)) {
> + block_t *fec = p_access->p_sys->fec_rows[row];
> + struct dozer_hdr *hdr = (struct dozer_hdr *)fec->p_buffer;
> + hdr->signature = htons(SIGNATURE);
> + hdr->type = htons(FEC_TYPE_FEC_ROW);
> + hdr->win = htons(win);
> + hdr->seq = htons(row);
> + hdr->count = htons(p_sys->i_rows);
> + fec->i_dts = p_buffer->i_dts;
> + return fec;
> + }
> + return NULL;
> +}
> +
> +static block_t *process_fec_col(sout_access_out_t *p_access, block_t *p_buffer, uint64_t seq)
> +{
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + uint16_t win = seq / p_sys->i_fecsize;
> + uint16_t iseq = seq % p_sys->i_fecsize;
> + uint16_t row = iseq / p_sys->i_cols;
> + uint16_t col = iseq % p_sys->i_cols;
> +
> + /* Update col fec */
> + fec_xor(p_access->p_sys->fec_cols[col], p_access->p_sys->fec_cols[col], p_buffer);
> + /* Last in row? Enqueue. */
> + if (row == (p_sys->i_rows - 1)) {
> + block_t *fec = p_access->p_sys->fec_cols[col];
> + struct dozer_hdr *hdr = (struct dozer_hdr *)fec->p_buffer;
> + hdr->signature = htons(SIGNATURE);
> + hdr->type = htons(FEC_TYPE_FEC_COL);
> + hdr->win = htons(win);
> + hdr->seq = htons(col);
> + hdr->count = htons(p_sys->i_cols);
> + fec->i_dts = p_buffer->i_dts;
> + return fec;
> + }
> + return NULL;
> +}
> +
> +static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
> +{
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + int i_len = 0;
> +
> + while( p_buffer )
> + {
> + block_t *p_next;
> + int i_packets = 0;
> + mtime_t now = mdate();
> + struct dozer_hdr *hdr;
> + block_t *fec_c, *fec_r;
> +
> + if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_mtu )
> + {
> + msg_Warn( p_access, "packet size > MTU, you should probably "
> + "increase the MTU" );
> + p_sys->b_mtu_warning = true;
> + }
> +
> + /* Check if there is enough space in the buffer */
> + if( p_sys->p_buffer &&
> + p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
> + {
> + if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
> + {
> + msg_Dbg( p_access, "late packet for UDP input (%"PRId64 ")",
> + now - p_sys->p_buffer->i_dts
> + - p_sys->i_caching );
> + }
> + /* Populate header */
> + hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
> + hdr->size = htons(p_sys->p_buffer->i_buffer - sizeof(struct dozer_hdr));
> + fec_r = process_fec_row(p_access, p_sys->p_buffer, i_seq - 1);
> + fec_c = process_fec_col(p_access, p_sys->p_buffer, i_seq - 1);
> + block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
> + p_sys->p_buffer = NULL;
> + if (fec_r)
> + block_FifoPut(p_sys->p_fifo, fec_r);
> + if (fec_c)
> + block_FifoPut(p_sys->p_fifo, fec_c);
> + }
> +
> + i_len += p_buffer->i_buffer;
> + while( p_buffer->i_buffer )
> + {
> + size_t i_payload_size = p_sys->i_mtu;
> + size_t i_write = __MIN( p_buffer->i_buffer, i_payload_size );
> +
> + i_packets++;
> +
> + if( !p_sys->p_buffer )
> + {
> + p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
> + if( !p_sys->p_buffer ) break;
> +
> + /* Populate header */
> + hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
> + hdr->signature = htons(SIGNATURE);
> + hdr->type = htons(FEC_TYPE_DATA);
> + hdr->seq = htons(i_seq % p_sys->i_fecsize);
> + hdr->win = htons((i_seq / p_sys->i_fecsize) & 0xFFFF);
> + i_seq++;
> + p_sys->p_buffer->i_buffer += sizeof(struct dozer_hdr);
> + } else
> + hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
> +
> + memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_buffer,
> + p_buffer->p_buffer, i_write );
> +
> + p_sys->p_buffer->i_buffer += i_write;
> + p_buffer->p_buffer += i_write;
> + p_buffer->i_buffer -= i_write;
> + if ( p_buffer->i_flags & BLOCK_FLAG_CLOCK )
> + {
> + if ( p_sys->p_buffer->i_flags & BLOCK_FLAG_CLOCK )
> + msg_Warn( p_access, "putting two PCRs at once" );
> + p_sys->p_buffer->i_flags |= BLOCK_FLAG_CLOCK;
> + }
> +
> + if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
> + {
> + /* Flush */
> + if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
> + {
> + msg_Dbg( p_access, "late packet for udp input (%"PRId64 ")",
> + mdate() - p_sys->p_buffer->i_dts
> + - p_sys->i_caching );
> + }
> + /* populate packet size */
> + hdr->size = htons(p_sys->p_buffer->i_buffer - sizeof(struct dozer_hdr));
> + fec_r = process_fec_row(p_access, p_sys->p_buffer, i_seq - 1);
> + fec_c = process_fec_col(p_access, p_sys->p_buffer, i_seq - 1);
> + block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
> + p_sys->p_buffer = NULL;
> + if (fec_r)
> + block_FifoPut(p_sys->p_fifo, fec_r);
> + if (fec_c)
> + block_FifoPut(p_sys->p_fifo, fec_c);
> + }
> + }
> +
> + p_next = p_buffer->p_next;
> + block_Release( p_buffer );
> + p_buffer = p_next;
> + }
> + return i_len;
> +}
> +
> +/*****************************************************************************
> + * Seek: seek to a specific location in a file
> + *****************************************************************************/
> +static int Seek( sout_access_out_t *p_access, off_t i_pos )
> +{
> + (void) i_pos;
> + msg_Err( p_access, "UDP sout access cannot seek" );
> + return -1;
> +}
> +
> +/*****************************************************************************
> + * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
> + *****************************************************************************/
> +static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
> +{
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + block_t *p_buffer;
> +
> +
> + while ( block_FifoCount( p_sys->p_empty_blocks ) > MAX_EMPTY_BLOCKS )
> + {
> + p_buffer = block_FifoGet( p_sys->p_empty_blocks );
> + block_Release( p_buffer );
> + }
> +
> + if( block_FifoCount( p_sys->p_empty_blocks ) == 0 )
> + {
> + p_buffer = block_Alloc( p_sys->i_mtu + sizeof(struct dozer_hdr));
> + }
> + else
> + {
> + p_buffer = block_FifoGet(p_sys->p_empty_blocks );
> + p_buffer->i_flags = 0;
> + p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu + sizeof(struct dozer_hdr));
> + }
> +
> + p_buffer->i_dts = i_dts;
> + p_buffer->i_buffer = 0;
> +
> + return p_buffer;
> +}
> +
> +/*****************************************************************************
> + * ThreadWrite: Write a packet on the network at the good time.
> + *****************************************************************************/
> +static void* ThreadWrite( void *data )
> +{
> + sout_access_out_t *p_access = data;
> + sout_access_out_sys_t *p_sys = p_access->p_sys;
> + mtime_t i_date_last = -1;
> + const unsigned i_group = var_GetInteger( p_access,
> + SOUT_CFG_PREFIX "group" );
> + mtime_t i_to_send = i_group;
> + unsigned i_dropped_packets = 0;
> + msg_Dbg( p_access, "Dozer: Sending thread started.");
> +
> + for (;;)
> + {
> + block_t *p_pk = block_FifoGet( p_sys->p_fifo );
> + mtime_t i_date, i_sent;
> +
> + i_date = p_sys->i_caching + p_pk->i_dts;
> + if( i_date_last > 0 )
> + {
> + if( i_date - i_date_last > 2000000 )
> + {
> + if( !i_dropped_packets )
> + msg_Dbg( p_access, "mmh, hole (%"PRId64" > 2s) -> drop",
> + i_date - i_date_last );
> +
> + block_FifoPut( p_sys->p_empty_blocks, p_pk );
> +
> + i_date_last = i_date;
> + i_dropped_packets++;
> + continue;
> + }
> + else if( i_date - i_date_last < -1000 )
> + {
> + if( !i_dropped_packets )
> + msg_Dbg( p_access, "mmh, packets in the past (%"PRId64")",
> + i_date_last - i_date );
> + }
> + }
> +
> + block_cleanup_push( p_pk );
> + i_to_send--;
> + if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
> + {
> + mwait( i_date );
> + i_to_send = i_group;
> + }
> + if ( send( p_sys->i_handle, p_pk->p_buffer, p_pk->i_buffer, 0 ) == -1 )
> + msg_Warn( p_access, "send error: %s", vlc_strerror_c(errno) );
> + vlc_cleanup_pop();
> +
> + if( i_dropped_packets )
> + {
> + msg_Dbg( p_access, "dropped %i packets", i_dropped_packets );
> + i_dropped_packets = 0;
> + }
> +
> +#if 1
> + i_sent = mdate();
> + if ( i_sent > i_date + 20000 )
> + {
> + msg_Dbg( p_access, "packet has been sent too late (%"PRId64 ")",
> + i_sent - i_date );
> + }
> +#endif
> +
> + block_FifoPut( p_sys->p_empty_blocks, p_pk );
> +
> + i_date_last = i_date;
> + }
> + return NULL;
> +}
> --
> 2.8.1
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel
--
With my kindest regards,
--
Jean-Baptiste Kempf
http://www.jbkempf.com/ - +33 672 704 734
Sent from my Electronic Device
More information about the vlc-devel
mailing list