[vlc-devel] [PATCH] Implementation of Dozer streaming protocol

Rémi Denis-Courmont remi at remlab.net
Sun Oct 2 01:59:00 CEST 2016


Hello,

Reviewing attachment is a PITA. Consider posting patches inline as with git-
send-email. Also don't sign-off your own patches, that's meaningless.

Le vendredi 30 septembre 2016, 14:14:43 Daniele Lacamera a écrit :
> Added access + access_output modules implementing Dozer streaming protocol.
> 
> A lua plugin for wireshark dissector is available at
> https://gist.github.com/danielinux/d9e854962e88b33ee77d41009fc0497f
> 
> access/Makefile.am        |    4
> access/dozer.c            |  723
> ++++++++++++++++++++++++++++++++++++++++++++++
> access_output/Makefile.am |    5
> access_output/dozer.c     |  673 ++++++++++++++++++++++++++++++++++++++++++
> 4 files changed, 1404 insertions(+), 1 deletion(-)

Should be separate patches for input and output.

>From 3db8c943fbf1476477b6810ef12d7d68fec7c942 Mon Sep 17 00:00:00 2001
From: Daniele Lacamera <root at danielinux.net>
Date: Fri, 30 Sep 2016 12:53:29 +0200
Subject: [PATCH] Added access + access_output modules implementing Dozer
 streaming protocol.

This is an implementation of a subset of the dozer network protocol
created by Sergio Ammirata. The protocol is UDP based and the recovery
algorithm is based on SMPTE2022. In this implementation, the
buffer/delay size is fixed and equal to 1316 * 8 * cols * rows / bps;
For example, for a 1Mbps stream with the default 10x5 matrix with 30%
overhead, we would have a 500ms buffer and and an adequate recovery of
up to 3% packet loss.

Signed-off-by: Daniele Lacamera <root at danielinux.net>
Signed-off-by: Sergio Ammirata <sergio at ammirata.net>
---
 modules/access/Makefile.am        |   4 +
 modules/access/dozer.c            | 723 
++++++++++++++++++++++++++++++++++++++
 modules/access_output/Makefile.am |   5 +-
 modules/access_output/dozer.c     | 673 +++++++++++++++++++++++++++++++++++
 4 files changed, 1404 insertions(+), 1 deletion(-)
 create mode 100644 modules/access/dozer.c
 create mode 100644 modules/access_output/dozer.c

diff --git a/modules/access/Makefile.am b/modules/access/Makefile.am
index 061f2b1..cbd0b57 100644
--- a/modules/access/Makefile.am
+++ b/modules/access/Makefile.am
@@ -401,6 +401,10 @@ libudp_plugin_la_SOURCES = access/udp.c
 libudp_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
 access_LTLIBRARIES += libudp_plugin.la
 
+libdozer_plugin_la_SOURCES = access/dozer.c
+libdozer_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
+access_LTLIBRARIES += libdozer_plugin.la
+
 libsftp_plugin_la_SOURCES = access/sftp.c
 libsftp_plugin_la_CFLAGS = $(AM_CFLAGS) $(SFTP_CFLAGS)
 libsftp_plugin_la_LIBADD = $(SFTP_LIBS)
diff --git a/modules/access/dozer.c b/modules/access/dozer.c
new file mode 100644
index 0000000..5f42263
--- /dev/null
+++ b/modules/access/dozer.c
@@ -0,0 +1,723 @@
+/*****************************************************************************
+ * dozer.c
+ 
*****************************************************************************
+ * Copyright (C) 2016 Daniele Lacamera, Sergio Ammirata
+ * $Id$
+ *
+ * Authors: Daniele Lacamera <root at danielinux.net>
+ *          Sergio Ammirata <sergio at ammirata.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ 
*****************************************************************************/
+
+/*****************************************************************************
+ * Preamble
+ 
*****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <errno.h>
+#include <vlc_common.h>
+#include <vlc_plugin.h>
+#include <vlc_access.h>
+#include <vlc_network.h>
+#include <vlc_block.h>
+#include <vlc_interrupt.h>
+#include <vlc_atomic.h>
+#ifdef HAVE_POLL
+# include <poll.h>
+#endif
+#include <fcntl.h>
+
+#define SIGNATURE 0xD023
+
+/* Fec data */
+#define FEC_TYPE_DATA     0x0000
+#define FEC_TYPE_FEC_ROW  0x1000
+#define FEC_TYPE_FEC_COL  0x2000
+
+struct __attribute__((packed)) dozer_hdr {

That's not portable.

+    uint16_t signature;
+    uint16_t type;
+    uint16_t seq;
+    uint16_t win;
+    uint16_t count;
+    uint16_t size;
+};
+
+
+/*****************************************************************************
+ * Module descriptor
+ 
*****************************************************************************/
+static int  Open( vlc_object_t * );
+static void Close( vlc_object_t * );
+
+#define BUFFER_TEXT N_("UDP Receive buffer")
+#define BUFFER_LONGTEXT N_("UDP receive buffer size (bytes)" )
+#define TIMEOUT_TEXT N_("UDP Source timeout (sec)")
+
+#define DOZER_LONGTEXT N_("Dozer input module: This module provides an 
implementation " \
+              "of a subset of the dozer network protocol created by Sergio 
Ammirata. " \
+              "In this implementation, the buffer/delay size is fixed and equal 
to " \
+              "1316 * 8 * cols * rows / bps; For example, for a 1Mbps stream 
with the " \
+              "default 10x5 matrix with 30% overhead, we would have a 500ms 
buffer and " \
+              "and an adequate recovery of up to 3% packet loss. The protocol 
is UDP " \
+              "based and the recovery algorithm is based on SMPTE2022." )
+
+#define DEFAULT_DOZER_UDP_BUFFER 0x400000
+
+vlc_module_begin ()
+    set_shortname( N_("DOZER" ) )
+    set_description( DOZER_LONGTEXT )
+    set_category( CAT_INPUT )
+    set_subcategory( SUBCAT_INPUT_ACCESS )
+
+    add_integer( "udp-buffer", DEFAULT_DOZER_UDP_BUFFER, BUFFER_TEXT, 
BUFFER_LONGTEXT, true )
+    add_integer( "udp-timeout", -1, TIMEOUT_TEXT, NULL, true )
+
+    set_capability( "access", 0 )
+    add_shortcut( "dozer", "dozer2022" )
+
+    set_callbacks( Open, Close )
+vlc_module_end ()
+
+struct access_sys_t
+{
+    int fd;
+    int timeout;
+    size_t mtu;
+    size_t fifo_size;
+    block_fifo_t *fifo;
+
+    block_t **fec_cols;
+    block_t **fec_rows;
+    block_t **matrix;
+
+    /* Matrix size */
+    uint16_t n_rows;
+    uint16_t n_cols;
+    uint16_t n_tot;
+
+    /* Counters/flags */
+    uint16_t fec_cur_win;
+    uint16_t fec_last_rx;
+    uint16_t fec_initialized;
+
+    vlc_sem_t semaphore;
+    vlc_thread_t thread;
+    atomic_bool timeout_reached;
+};
+
+/*****************************************************************************
+ * Local prototypes
+ 
*****************************************************************************/
+static block_t *BlockUDP( access_t *, bool * );
+static int Control( access_t *, int, va_list );
+static void* ThreadRead( void *data );
+
+/*****************************************************************************
+ * FEC functions
+ 
*****************************************************************************/
+static void fec_xor(block_t *dst, block_t *f1, block_t *f2)
+{
+    uint32_t len, len1, len2;
+    uint32_t i;
+
+
+    len1 = f1->i_buffer;
+    len2 = f2->i_buffer; 
+    len = len1;
+
+    if (len2 > len1)
+        len = len2;
+    
+    if (len < 10)
+        return;
+    
+    for(i = 10; i < len; i++) {
+        if (i >= len1) {
+            dst->p_buffer[i] = f2->p_buffer[i];
+        } else if (i >= len2) {
+            dst->p_buffer[i] = f1->p_buffer[i];
+        }
+        else dst->p_buffer[i] = f1->p_buffer[i] ^ f2->p_buffer[i];
+    }
+    dst->i_buffer = len;
+}
+
+/*****************************************************************************
+ * Open: open the socket
+ 
*****************************************************************************/
+static int Open( vlc_object_t *p_this )
+{
+    access_t     *p_access = (access_t*)p_this;
+    access_sys_t *sys;
+
+    if( p_access->b_preparsing )
+        return VLC_EGENERIC;
+
+    sys = calloc( sizeof( *sys ), 1 );
+    if( unlikely( sys == NULL ) )
+        return VLC_ENOMEM;
+
+    p_access->p_sys = sys;
+
+    /* Set up p_access */
+    ACCESS_SET_CALLBACKS( NULL, BlockUDP, Control, NULL );
+
+    char *psz_name = strdup( p_access->psz_location );
+    char *psz_parser;
+    const char *psz_server_addr, *psz_bind_addr = "";
+    int  i_bind_port = 1234, i_server_port = 0;
+
+    if( unlikely(psz_name == NULL) )
+        goto error;
+
+    /* Parse psz_name syntax :
+     * [serveraddr[:serverport]][@[bindaddr]:[bindport]] */
+    psz_parser = strchr( psz_name, '@' );
+    if( psz_parser != NULL )
+    {
+        /* Found bind address and/or bind port */
+        *psz_parser++ = '\0';
+        psz_bind_addr = psz_parser;
+
+        if( psz_bind_addr[0] == '[' )
+            /* skips bracket'd IPv6 address */
+            psz_parser = strchr( psz_parser, ']' );
+
+        if( psz_parser != NULL )
+        {
+            psz_parser = strchr( psz_parser, ':' );
+            if( psz_parser != NULL )
+            {
+                *psz_parser++ = '\0';
+                i_bind_port = atoi( psz_parser );
+            }
+        }
+    }
+
+    psz_server_addr = psz_name;
+    psz_parser = ( psz_server_addr[0] == '[' )
+        ? strchr( psz_name, ']' ) /* skips bracket'd IPv6 address */
+        : psz_name;
+
+    if( psz_parser != NULL )
+    {
+        psz_parser = strchr( psz_parser, ':' );
+        if( psz_parser != NULL )
+        {
+            *psz_parser++ = '\0';
+            i_server_port = atoi( psz_parser );
+        }
+    }
+
+    msg_Dbg( p_access, "opening server=%s:%d local=%s:%d",
+             psz_server_addr, i_server_port, psz_bind_addr, i_bind_port );
+
+    sys->fd = net_OpenDgram( p_access, psz_bind_addr, i_bind_port,
+                             psz_server_addr, i_server_port, IPPROTO_UDP );
+    free( psz_name );
+        if( sys->fd == -1 )
+    {
+        msg_Err( p_access, "cannot open socket" );
+        goto error;
+    }

There's a lot of copy-paste from the UDP plugin here, above and below. First, 
that seems partly unjustified. And second, that is a violation of the moral 
right of attribution of the original authors (including myself).

+
+    /* Revert to blocking I/O */
+#ifndef _WIN32
+    fcntl(sys->fd, F_SETFL, fcntl(sys->fd, F_GETFL) & ~O_NONBLOCK);
+#else
+    ioctlsocket(sys->fd, FIONBIO, &(unsigned long){ 0 });
+#endif
+
+    sys->fifo = block_FifoNew();
+    if( unlikely( sys->fifo == NULL ) )
+    {
+        net_Close( sys->fd );
+        goto error;
+    }
+
+    sys->mtu = 7 * 188 + sizeof(struct dozer_hdr);
+    sys->fifo_size = var_CreateGetInteger( p_access, "udp-buffer");
+    vlc_sem_init( &sys->semaphore, 0 );
+
+    sys->timeout = var_CreateGetInteger( p_access, "udp-timeout");
+    atomic_init(&sys->timeout_reached, false);
+    if( sys->timeout > 0)
+        sys->timeout *= 1000;
+
+    if( vlc_clone( &sys->thread, ThreadRead, p_access,
+                   VLC_THREAD_PRIORITY_INPUT ) )
+    {
+        vlc_sem_destroy( &sys->semaphore );
+        block_FifoRelease( sys->fifo );
+        net_Close( sys->fd );
+error:
+        free( sys );
+        return VLC_EGENERIC;
+    }
+
+    return VLC_SUCCESS;
+}
+
+/*****************************************************************************
+ * Close: free unused data structures
+ 
*****************************************************************************/
+static void Close( vlc_object_t *p_this )
+{
+    access_t     *p_access = (access_t*)p_this;
+    access_sys_t *sys = p_access->p_sys;
+
+    vlc_cancel( sys->thread );
+    vlc_join( sys->thread, NULL );
+    vlc_sem_destroy( &sys->semaphore );
+    block_FifoRelease( sys->fifo );
+    net_Close( sys->fd );
+    free( sys );
+}
+
+/*****************************************************************************
+ * Control:
+ 
*****************************************************************************/
+static int Control( access_t *p_access, int i_query, va_list args )
+{
+    bool    *pb_bool;
+    int64_t *pi_64;
+
+    switch( i_query )
+    {
+        case STREAM_CAN_SEEK:
+        case STREAM_CAN_FASTSEEK:
+        case STREAM_CAN_PAUSE:
+        case STREAM_CAN_CONTROL_PACE:
+            pb_bool = (bool*)va_arg( args, bool* );
+            *pb_bool = false;
+            break;
+
+        case STREAM_GET_PTS_DELAY:
+            pi_64 = (int64_t*)va_arg( args, int64_t * );
+            *pi_64 = INT64_C(1000)
+                   * var_InheritInteger(p_access, "network-caching");
+            break;
+
+        default:
+            return VLC_EGENERIC;
+    }
+    return VLC_SUCCESS;
+}
+
+
+/*****************************************************************************
+ * BlockUDP:
+ 
*****************************************************************************/
+static block_t *BlockUDP( access_t *p_access, bool *restrict eof )
+{
+    access_sys_t *sys = p_access->p_sys;
+    block_t *block;
+    if (atomic_load(&sys->timeout_reached)) {
+        *eof = true;
+        return NULL;
+    }
+    vlc_sem_wait_i11e(&sys->semaphore);
+    vlc_fifo_Lock(sys->fifo);
+    block = vlc_fifo_DequeueUnlocked(sys->fifo);
+    vlc_fifo_Unlock(sys->fifo);
+    return block;
+}
+
+static void fifo_enqueue(access_t *access, block_t *pkt)
+{
+    access_sys_t *sys = access->p_sys;
+    /* Discard old buffers on overflow */
+    while (vlc_fifo_GetBytes(sys->fifo) + pkt->i_buffer > sys->fifo_size)
+    {
+        int canc = vlc_savecancel();
+        block_Release(vlc_fifo_DequeueUnlocked(sys->fifo));
+        vlc_restorecancel(canc);
+    }
+    pkt->p_buffer += sizeof(struct dozer_hdr);
+    pkt->i_buffer -= sizeof(struct dozer_hdr);
+    vlc_fifo_QueueUnlocked(sys->fifo, pkt);
+}
+
+static int fec_found_and_recovered(access_t *access, block_t *new, uint32_t 
new_idx, uint16_t win)
+{
+    struct dozer_hdr *new_hdr = (struct dozer_hdr *)new->p_buffer;
+    access_sys_t *sys = access->p_sys;
+
+    new_hdr->type = FEC_TYPE_DATA;
+    new_hdr->seq = htons(new_idx);
+    new_hdr->win = htons(win);
+    new_hdr->count = 0;
+
+    if (htons(new_hdr->size) != (new->i_buffer - sizeof(struct dozer_hdr))) {
+        msg_Err(access, "Dozer Recovery FAILED. Invalid size %u for recovered 
packet.", ntohs(new_hdr->size));
+        return 0;
+    }
+    msg_Dbg(access, "Dozer Packet Recovered. Cur win: %u idx: %u buffer size: 
%lu size stored: %u", ntohs(win),  new_idx, new->i_buffer, ntohs(new_hdr-
>size));
+    sys->matrix[new_idx] = new;
+
+    if ((new_idx == sys->fec_last_rx + 1U)) {
+        sys->fec_last_rx++;
+    }
+
+    while (sys->fec_last_rx < (sys->n_tot - 1) && (sys->matrix[sys-
>fec_last_rx + 1]) ) {
+        sys->fec_last_rx++;
+    }
+    return 1;
+}
+
+static void fec_try_recovery(access_t *access, block_t *pkt)
+{
+    access_sys_t *sys = access->p_sys;
+    struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
+    uint16_t pkt_win    = ntohs(hdr->win);
+    uint16_t pkt_seq   = ntohs(hdr->seq);
+    uint16_t pkt_type   = ntohs(hdr->type);
+    uint32_t i;
+    int idx;
+    int found = 0;
+    int rec_r = -1, rec_c = -1;
+    int new_idx = -1;
+    block_t *new = NULL;
+
+    if (pkt_win != sys->fec_cur_win) {
+        block_Release(pkt);
+        return;
+    }
+    if (pkt_type == FEC_TYPE_FEC_COL) {
+        sys->fec_cols[pkt_seq] = pkt;
+        for (i = 0; i < sys->n_rows; i++) {
+            idx = i * sys->n_cols + pkt_seq;
+            if (!sys->matrix[idx]) {
+                found++;
+                rec_r = i;
+                new_idx = idx;
+            }
+        }
+        if (found == 1) {
+            new = block_Alloc(sys->mtu);
+            memset(new->p_buffer, 0, sys->mtu);
+            new->i_buffer = 0;
+            fec_xor(new, new, pkt);
+            for (i = 0; i < sys->n_rows; i++) {
+                if (i != (uint32_t)rec_r) {
+                    fec_xor(new, new, sys->matrix[i * sys->n_cols + 
pkt_seq]);
+                }
+            }
+        }
+    } else if (pkt_type == FEC_TYPE_FEC_ROW) {
+        sys->fec_rows[pkt_seq] = pkt;
+        for (i = 0; i < sys->n_cols; i++) {
+            idx = sys->n_cols * pkt_seq + i;
+            if (!sys->matrix[idx]) {
+                found++;
+                rec_c = i;
+                new_idx = idx;
+            }
+        }
+        if (found == 1) {
+            new = block_Alloc(sys->mtu);
+            memset(new->p_buffer, 0, sys->mtu);
+            new->i_buffer =0;
+            fec_xor(new, new, pkt);
+            for (i = 0; i < sys->n_cols; i++) {
+                if (i != (uint32_t)rec_c) {
+                    fec_xor(new, new, sys->matrix[pkt_seq * sys->n_cols + 
i]);
+                }
+            }
+        }
+    }
+
+    /* If found and recovered... */
+    if (new) {
+        fec_found_and_recovered(access, new, new_idx, ntohs(hdr->win));
+    }
+}
+
+static void process_fec_initials(access_t *access, block_t *pkt)
+{
+    access_sys_t *sys = access->p_sys;
+    struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
+    uint16_t pkt_type   = ntohs(hdr->type);
+    uint16_t pkt_count   = ntohs(hdr->count);
+
+    if (pkt_type == FEC_TYPE_FEC_COL) {
+        sys->n_cols = pkt_count;
+    }
+    if (pkt_type == FEC_TYPE_FEC_ROW) {
+        sys->n_rows = pkt_count;
+    }
+    if ((sys->n_cols > 0) && (sys->n_rows > 0)) {
+        sys->n_tot = sys->n_cols * sys->n_rows;
+        msg_Dbg(access, "******* Initialized Dozer FEC recovery matrix 
%ux%u", sys->n_cols, sys->n_rows);
+        sys->matrix = calloc(sizeof(block_t*) * sys->n_tot, 1);
+        sys->fec_rows = calloc(sys->n_rows * (sizeof(block_t *)), 1);
+        sys->fec_cols = calloc(sys->n_cols * (sizeof(block_t *)), 1);
+        sys->fec_initialized = 1;
+        fec_try_recovery(access, pkt);
+    } else {
+        block_Release(pkt);
+    }
+}
+
+static int recover_frame(access_t *access, uint32_t idx)
+{
+    uint32_t i, r, c;
+    int can_recover;
+    access_sys_t *sys = access->p_sys;
+    block_t *new = NULL;
+
+    r = idx / sys->n_cols;
+    c = idx % sys->n_cols;
+    can_recover = 0;
+    
+    if (sys->fec_cols[c] != NULL) {
+        can_recover = 1;
+        for (i = 0; i < sys->n_rows; i++) {
+            uint32_t cur = i * sys->n_cols + c;
+            if (((sys->matrix[cur]) == NULL) && (cur != idx)) {
+                /* Cannot recover: packet missing */
+                can_recover = 0;
+                break;
+            }
+        }
+        if (can_recover) {
+            block_t *new;
+            msg_Dbg(access, "Extra recovery of packet %u via column %u\n", 
idx, c);
+            new = block_Alloc(sys->mtu);
+            if (!new)
+                return 0;
+            memset(new->p_buffer, 0, sys->mtu);
+            new->i_buffer =0;
+            fec_xor(new, new, sys->fec_cols[c]);
+            for (i = 0; i < sys->n_rows; i++) {
+                uint32_t cur = i * sys->n_cols + c;
+                if (i != r)
+                    fec_xor(new, new, sys->matrix[cur]);
+            }
+        }
+    }
+    
+    can_recover = 0;
+    if (sys->fec_rows[r] != NULL) {
+        can_recover = 1;
+        for (i = 0; i < sys->n_cols; i++) {
+            uint32_t cur = r * sys->n_cols + i;
+            if (((sys->matrix[cur]) == NULL) && (cur != idx)) {
+                /* Cannot recover: packet missing */
+                can_recover = 0;
+                break;
+            }
+        }
+        if (can_recover) {
+            block_t *new;
+            msg_Dbg(access, "Extra recovery of packet %u via row %u\n", idx, 
c);
+            new = block_Alloc(sys->mtu);
+            if (!new)
+                return 0;
+            memset(new->p_buffer, 0, sys->mtu);
+            new->i_buffer =0;
+            fec_xor(new, new, sys->fec_rows[r]);
+            for (i = 0; i < sys->n_cols; i++) {
+                uint32_t cur = r * sys->n_cols + i;
+                if (i != c)
+                    fec_xor(new, new, sys->matrix[cur]);
+            }
+        }
+    }
+    if (new)
+        return fec_found_and_recovered(access, new, idx, sys->fec_cur_win);
+    else {
+        msg_Dbg(access, "Extra recovery of packet %u failed.\n", idx);
+        return 0;
+    }
+}
+
+static void flush_win(access_t *access)
+{
+    uint32_t i;
+    int frame_recovered;
+    access_sys_t *sys = access->p_sys;
+
+    if (!sys->matrix)
+        return;
+    do {
+        frame_recovered = 0;
+        for (i = 0; i < sys->n_tot; i++) {
+            if (!sys->matrix[i]) {
+                frame_recovered = recover_frame(access, i);
+            }
+        }
+    } while (frame_recovered > 0);
+
+    vlc_fifo_Lock(sys->fifo);
+    for (i = 0; i < sys->n_tot; i++) {
+        if (sys->matrix[i]) {
+            fifo_enqueue(access, sys->matrix[i]);
+            vlc_sem_post(&sys->semaphore);
+            sys->matrix[i] = NULL;
+        } else {
+            msg_Warn(access, "Warning: Dozer frame %u definitely missing!", 
i);
+        }
+    }
+    for (i = 0; i < sys->n_rows; i++) {
+        if (sys->fec_rows[i]) {
+            block_Release(sys->fec_rows[i]);
+            sys->fec_rows[i] = NULL;
+        }
+    }
+    for (i = 0; i < sys->n_cols; i++) {
+        if (sys->fec_cols[i]) {
+            block_Release(sys->fec_cols[i]);
+            sys->fec_cols[i] = NULL;
+        }
+    }
+    vlc_fifo_Unlock(sys->fifo);
+}
+
+static int frame_to_matrix(access_t *access, block_t *pkt)
+{
+    access_sys_t *sys = access->p_sys;
+    struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
+    uint16_t win;
+    uint16_t idx;
+
+    idx = ntohs(hdr->seq);
+    win = ntohs(hdr->win);
+    if (sys->fec_cur_win != win) {
+        if (((uint16_t)(sys->fec_cur_win - win)) > 3) {
+            /* Next win started. */
+            flush_win(access);
+            sys->fec_cur_win = win;
+            sys->fec_last_rx = 0;
+            sys->matrix[idx] = pkt;
+        } else {
+            /* Probably old frame. Forward. */
+            msg_Dbg(access, "Received old? win: %d cur: %d", win, sys-
>fec_cur_win);
+    	    vlc_fifo_Lock(sys->fifo);
+            fifo_enqueue(access, pkt);
+            vlc_sem_post(&sys->semaphore);
+    	    vlc_fifo_Unlock(sys->fifo);
+        }
+    } else {
+        sys->matrix[idx] = pkt;
+        if (sys->fec_last_rx + 1 == idx) {
+            sys->fec_last_rx = idx;
+            while ((sys->fec_last_rx < (sys->n_tot - 1)) && (sys->matrix[sys-
>fec_last_rx + 1] != NULL)) {
+                sys->fec_last_rx++;
+            }
+            return idx;
+        }
+    }
+    return idx;
+}
+
+static void process_fec(access_t *access, block_t *pkt)
+{
+    access_sys_t *sys = access->p_sys;
+    struct dozer_hdr *hdr = (struct dozer_hdr *)(pkt->p_buffer);
+    uint16_t pkt_type   = ntohs(hdr->type);
+    uint16_t pkt_sign   = ntohs(hdr->signature);
+
+    if (pkt_sign != SIGNATURE) {
+        msg_Dbg(access, "Received invalid packet with sig: %04x. 
Discarding.", pkt_sign);
+        block_Release(pkt);
+        return;
+    }
+    if (pkt_type != FEC_TYPE_DATA) {
+        if (!sys->fec_initialized)
+            process_fec_initials(access, pkt);
+        else
+            fec_try_recovery(access, pkt);
+    } else {
+        if (!sys->fec_initialized) 
+            block_Release(pkt);
+        else
+            frame_to_matrix(access, pkt);
+    }
+}
+
+/*****************************************************************************
+ * ThreadRead: Pull packets from socket as soon as possible.
+ 
*****************************************************************************/
+static void* ThreadRead( void *data )
+{
+    access_t *access = data;
+    access_sys_t *sys = access->p_sys;
+
+    for(;;)
+    {
+        block_t *pkt = block_Alloc(sys->mtu);
+        if (unlikely(pkt == NULL))
+        {   /* OOM - dequeue and discard one packet */
+            char dummy;
+            recv(sys->fd, &dummy, 1, 0);
+            continue;
+        }
+
+        struct iovec iov = {
+            .iov_base = pkt->p_buffer,
+            .iov_len = sys->mtu,
+        };
+        struct msghdr msg = {
+            .msg_iov = &iov,
+            .msg_iovlen = 1,
+#ifdef __linux__
+            .msg_flags = MSG_TRUNC,
+#endif
+        };
+        ssize_t len;
+        block_cleanup_push(pkt);
+        do
+        {
+            int poll_return=0;
+            struct pollfd ufd[1];
+            ufd[0].fd = sys->fd;
+            ufd[0].events = POLLIN;
+
+            while ((poll_return = poll(ufd, 1, sys->timeout)) < 0); /* 
cancellation point */
+            if (unlikely( poll_return == 0))
+            {
+                msg_Err( access, "Timeout on receiving, timeout %d seconds", 
sys->timeout/1000 );
+                atomic_store(&sys->timeout_reached, true);
+                vlc_sem_post(&sys->semaphore);
+                len=0;
+                break;
+            }
+            len = recvmsg(sys->fd, &msg, 0);
+        }
+        while (len == -1);
+        vlc_cleanup_pop();
+
+#ifdef MSG_TRUNC
+        if (msg.msg_flags & MSG_TRUNC)
+        {
+            msg_Err(access, "%zd bytes packet truncated (MTU was %zu)",
+                    len, sys->mtu);
+            pkt->i_flags |= BLOCK_FLAG_CORRUPTED;
+            sys->mtu = len;
+        }
+        else
+#endif
+            pkt->i_buffer = len;
+
+        process_fec(access, pkt);
+    }
+    return NULL;
+}
diff --git a/modules/access_output/Makefile.am 
b/modules/access_output/Makefile.am
index 7c93116..03deb12 100644
--- a/modules/access_output/Makefile.am
+++ b/modules/access_output/Makefile.am
@@ -6,12 +6,15 @@ libaccess_output_file_plugin_la_LIBADD = $(LIBPTHREAD)
 libaccess_output_http_plugin_la_SOURCES = access_output/http.c
 libaccess_output_udp_plugin_la_SOURCES = access_output/udp.c
 libaccess_output_udp_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
+libaccess_output_dozer_plugin_la_SOURCES = access_output/dozer.c
+libaccess_output_dozer_plugin_la_LIBADD = $(SOCKET_LIBS) $(LIBPTHREAD)
 
 access_out_LTLIBRARIES = \
 	libaccess_output_dummy_plugin.la \
 	libaccess_output_file_plugin.la \
 	libaccess_output_http_plugin.la \
-	libaccess_output_udp_plugin.la
+	libaccess_output_udp_plugin.la \
+	libaccess_output_dozer_plugin.la
 
 libaccess_output_livehttp_plugin_la_SOURCES = access_output/livehttp.c
 libaccess_output_livehttp_plugin_la_CFLAGS = $(AM_CFLAGS) $(GCRYPT_CFLAGS)
diff --git a/modules/access_output/dozer.c b/modules/access_output/dozer.c
new file mode 100644
index 0000000..87acda0
--- /dev/null
+++ b/modules/access_output/dozer.c
@@ -0,0 +1,673 @@
+/*****************************************************************************
+ * dozer.c
+ 
*****************************************************************************
+ * Copyright (C) 2016 Daniele Lacamera, Sergio Ammirata
+ * $Id$
+ *
+ * Authors: Daniele Lacamera <root at danielinux.net>
+ *          Sergio Ammirata <sergio at ammirata.net>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ 
*****************************************************************************/
+
+/*****************************************************************************
+ * Preamble
+ 
*****************************************************************************/
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_plugin.h>
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <assert.h>
+#include <errno.h>
+
+#include <vlc_sout.h>
+#include <vlc_block.h>
+
+#ifdef _WIN32
+#   include <winsock2.h>
+#   include <ws2tcpip.h>
+#else
+#   include <sys/socket.h>
+#endif
+
+#include <vlc_network.h>
+
+#define MAX_EMPTY_BLOCKS 200
+
+#define SIGNATURE 0xD023
+
+
+/* Fec data */
+
+#define DEFAULT_DOZER_FEC_ROWS 5
+#define DEFAULT_DOZER_FEC_COLS 10
+
+#define FEC_TYPE_DATA     0x0000
+#define FEC_TYPE_FEC_ROW  0x1000
+#define FEC_TYPE_FEC_COL  0x2000
+
+static uint64_t i_seq = 0;
+
+struct __attribute__((packed)) dozer_hdr {
+    uint16_t signature;
+    uint16_t type;
+    uint16_t seq;
+    uint16_t win;
+    uint16_t count;
+    uint16_t size;
+};
+
+/*****************************************************************************
+ * Module descriptor
+ 
*****************************************************************************/
+static int  Open ( vlc_object_t * );
+static void Close( vlc_object_t * );
+
+#define SOUT_CFG_PREFIX "sout-dozer-"
+
+#define COLS_TEXT N_("FEC 2022 Columns")
+#define COLS_LONGTEXT N_( \
+    "Number of Columns to use for the FEC 2022 matrix. Default is 10 cols 
and" \
+    " 5 rows for 30% overhead and about 3% packet loss recovery" )
+
+#define ROWS_TEXT N_("FEC 2022 Rows")
+#define ROWS_LONGTEXT N_( \
+    "Number of Rows to use for the FEC 2022 matrix. Default is 5 rows and" \
+    " 10 cols for 30% overhead and about 3% packet loss recovery" )
+
+#define CACHING_TEXT N_("Caching value (ms)")
+#define CACHING_LONGTEXT N_( \
+    "Default caching value for outbound UDP Dozer streams. This " \
+    "value should be set in milliseconds." )
+
+#define GROUP_TEXT N_("Group packets")
+#define GROUP_LONGTEXT N_("Packets can be sent one by one at the right time " 
\
+                          "or by groups. You can choose the number " \
+                          "of packets that will be sent at a time. It " \
+                          "helps reducing the scheduling load on " \
+                          "heavily-loaded systems." )
+
+#define DOZER_LONGTEXT N_("Dozer output module: This module provides an 
implementation " \
+              "of a subset of the dozer network protocol created by Sergio 
Ammirata. " \
+              "In this implementation, the buffer/delay size is fixed and equal 
to " \
+              "1316 * 8 * cols * rows / bps; For example, for a 1Mbps stream 
with the " \
+              "default 10x5 matrix with 30% overhead, we would have a 500ms 
buffer and " \
+              "and an adequate recovery of up to 3% packet loss. The protocol 
is UDP " \
+              "based and the recovery algorithm is based on SMPTE2022." )
+
+vlc_module_begin ()
+    set_description( DOZER_LONGTEXT )
+    set_shortname( "DOZER" )
+    set_category( CAT_SOUT )
+    set_subcategory( SUBCAT_SOUT_ACO )
+    add_integer( SOUT_CFG_PREFIX "caching", DEFAULT_PTS_DELAY / 1000, 
CACHING_TEXT, CACHING_LONGTEXT, true )
+    add_integer( SOUT_CFG_PREFIX "group", 1, GROUP_TEXT, GROUP_LONGTEXT, true 
)
+    add_integer( SOUT_CFG_PREFIX "rows", DEFAULT_DOZER_FEC_ROWS, ROWS_TEXT, 
ROWS_LONGTEXT, true)
+    add_integer( SOUT_CFG_PREFIX "cols", DEFAULT_DOZER_FEC_COLS, COLS_TEXT, 
COLS_LONGTEXT, true)
+    set_capability( "sout access", 0 )
+    add_shortcut( "dozer", "dozer2022" )
+    set_callbacks( Open, Close )
+vlc_module_end ()
+
+/*****************************************************************************
+ * Exported prototypes
+ 
*****************************************************************************/
+
+static const char *const ppsz_sout_options[] = {
+    "caching",
+    "group",
+    "rows",
+    "cols",
+    NULL
+};
+
+/* Options handled by the libvlc network core */
+static const char *const ppsz_core_options[] = {
+    "dscp",
+    "ttl",
+    "miface",
+    NULL
+};
+
+static ssize_t Write   ( sout_access_out_t *, block_t * );
+static int  Seek    ( sout_access_out_t *, off_t  );
+static int Control( sout_access_out_t *, int, va_list );
+
+static void* ThreadWrite( void * );
+static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
+
+struct sout_access_out_sys_t
+{
+    mtime_t       i_caching;
+    int           i_handle;
+    bool          b_mtu_warning;
+    size_t        i_mtu;
+
+    block_fifo_t *p_fifo;
+    block_fifo_t *p_empty_blocks;
+    block_t      *p_buffer;
+
+    block_t      **fec_rows;
+    block_t      **fec_cols;
+    
+    int          i_rows;
+    int          i_cols;
+    int          i_fecsize;
+
+    vlc_thread_t  thread;
+};
+
+
+
+/*****************************************************************************
+ * FEC functions
+ 
*****************************************************************************/
+static void fec_xor(block_t *dst, block_t *f1, block_t *f2)
+{
+    uint32_t len, len1, len2;
+    uint32_t i;
+
+
+    len1 = f1->i_buffer;
+    len2 = f2->i_buffer; 
+    len = len1;
+
+    if (len2 > len1)
+        len = len2;
+    
+    if (len < 10)
+        return;
+    
+    for(i = 10; i < len; i++) {
+        if (i >= len1) {
+            dst->p_buffer[i] = f2->p_buffer[i];
+        } else if (i >= len2) {
+            dst->p_buffer[i] = f1->p_buffer[i];
+        }
+        else dst->p_buffer[i] = f1->p_buffer[i] ^ f2->p_buffer[i];
+    }
+    dst->i_buffer = len;
+}
+
+#define DEFAULT_PORT 1234
+
+/*****************************************************************************
+ * Open: open the file
+ 
*****************************************************************************/
+static int Open( vlc_object_t *p_this )
+{
+    sout_access_out_t       *p_access = (sout_access_out_t*)p_this;
+    sout_access_out_sys_t   *p_sys;
+
+    char                *psz_dst_addr = NULL;
+    int                 i_dst_port;
+
+    int                 i_handle;
+
+    msg_Dbg( p_access, "Dozer: Called Open()");
+    config_ChainParse( p_access, SOUT_CFG_PREFIX,
+                       ppsz_sout_options, p_access->p_cfg );
+    config_ChainParse( p_access, "",
+                       ppsz_core_options, p_access->p_cfg );
+
+    if (var_Create (p_access, "dst-port", VLC_VAR_INTEGER)
+     || var_Create (p_access, "src-port", VLC_VAR_INTEGER)
+     || var_Create (p_access, "dst-addr", VLC_VAR_STRING)
+     || var_Create (p_access, "src-addr", VLC_VAR_STRING))
+    {
+        return VLC_ENOMEM;
+    }
+
+    if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
+        return VLC_ENOMEM;
+    p_access->p_sys = p_sys;
+
+    i_dst_port = DEFAULT_PORT;
+    char *psz_parser = psz_dst_addr = strdup( p_access->psz_path );
+    if( !psz_dst_addr )
+    {
+        free( p_sys );
+        return VLC_ENOMEM;
+    }
+
+    if (psz_parser[0] == '[')
+        psz_parser = strchr (psz_parser, ']');
+
+    psz_parser = strchr (psz_parser ? psz_parser : psz_dst_addr, ':');
+    if (psz_parser != NULL)
+    {
+        *psz_parser++ = '\0';
+        i_dst_port = atoi (psz_parser);
+    }
+
+    i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1,
+                                 IPPROTO_UDP );
+    free (psz_dst_addr);
+
+    if( i_handle == -1 )
+    {
+         msg_Err( p_access, "failed to create raw UDP socket" );
+         free (p_sys);
+         return VLC_EGENERIC;
+    }
+    else
+    {
+        char addr[NI_MAXNUMERICHOST];
+        int port;
+
+        if (net_GetSockAddress (i_handle, addr, &port) == 0)
+        {
+            msg_Dbg (p_access, "source: %s port %d", addr, port);
+            var_SetString (p_access, "src-addr", addr);
+            var_SetInteger (p_access, "src-port", port);
+        }
+
+        if (net_GetPeerAddress (i_handle, addr, &port) == 0)
+        {
+            msg_Dbg (p_access, "destination: %s port %d", addr, port);
+            var_SetString (p_access, "dst-addr", addr);
+            var_SetInteger (p_access, "dst-port", port);
+        }
+    }
+    shutdown( i_handle, SHUT_RD );
+
+    p_sys->i_rows = var_GetInteger( p_access, SOUT_CFG_PREFIX "rows" );
+    p_sys->i_cols = var_GetInteger( p_access, SOUT_CFG_PREFIX "cols" );
+    p_sys->i_fecsize = p_sys->i_rows * p_sys->i_cols;
+    p_sys->i_caching = UINT64_C(1000)
+                     * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching");
+    p_sys->i_handle = i_handle;
+    p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
+    p_sys->b_mtu_warning = false;
+    p_sys->p_fifo = block_FifoNew();
+    p_sys->p_empty_blocks = block_FifoNew();
+    p_sys->p_buffer = NULL;
+    
+    
+    p_sys->fec_rows = calloc(p_sys->i_rows * sizeof(block_t *), 1);
+    if (!p_sys->fec_rows) {
+        msg_Err( p_access, "cannot allocate FEC matrix" );
+        block_FifoRelease( p_sys->p_fifo );
+        block_FifoRelease( p_sys->p_empty_blocks );
+        net_Close (i_handle);
+        free (p_sys);
+        return VLC_EGENERIC;
+    }
+    p_sys->fec_cols = calloc(p_sys->i_cols * sizeof(block_t *), 1);
+    if (!p_sys->fec_cols) {
+        free(p_sys->fec_rows);
+        msg_Err( p_access, "cannot allocate FEC matrix" );
+        block_FifoRelease( p_sys->p_fifo );
+        block_FifoRelease( p_sys->p_empty_blocks );
+        net_Close (i_handle);
+        free (p_sys);
+        return VLC_EGENERIC;
+    }
+    if( vlc_clone( &p_sys->thread, ThreadWrite, p_access,
+                           VLC_THREAD_PRIORITY_HIGHEST ) )
+    {
+        msg_Err( p_access, "cannot spawn sout access thread" );
+        block_FifoRelease( p_sys->p_fifo );
+        block_FifoRelease( p_sys->p_empty_blocks );
+        net_Close (i_handle);
+        free (p_sys);
+        free(p_sys->fec_rows);
+        free(p_sys->fec_cols);
+        return VLC_EGENERIC;
+    }
+
+    p_access->pf_write = Write;
+    p_access->pf_seek = Seek;
+    p_access->pf_control = Control;
+
+    return VLC_SUCCESS;
+}
+
+/*****************************************************************************
+ * Close: close the target
+ 
*****************************************************************************/
+static void Close( vlc_object_t * p_this )
+{
+    sout_access_out_t     *p_access = (sout_access_out_t*)p_this;
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+
+    vlc_cancel( p_sys->thread );
+    vlc_join( p_sys->thread, NULL );
+    block_FifoRelease( p_sys->p_fifo );
+    block_FifoRelease( p_sys->p_empty_blocks );
+
+    if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
+
+    net_Close( p_sys->i_handle );
+    free( p_sys );
+}
+
+static int Control( sout_access_out_t *p_access, int i_query, va_list args )
+{
+    (void)p_access;
+
+    switch( i_query )
+    {
+        case ACCESS_OUT_CONTROLS_PACE:
+            *va_arg( args, bool * ) = false;
+            break;
+
+        default:
+            return VLC_EGENERIC;
+    }
+    return VLC_SUCCESS;
+}
+
+/*****************************************************************************
+ * Write: standard write on a file descriptor.
+ 
*****************************************************************************/
+
+static void reset_fec(sout_access_out_t *p_access)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    int i;
+    mtime_t now = mdate();
+    for (i = 0; i < p_sys->i_rows; i++) {
+        p_access->p_sys->fec_rows[i] = NewUDPPacket(p_access, now);
+    }
+    for (i = 0; i < p_sys->i_cols; i++) {
+        p_access->p_sys->fec_cols[i] = NewUDPPacket(p_access, now);
+    }
+}
+
+static block_t *process_fec_row(sout_access_out_t *p_access, block_t 
*p_buffer, uint64_t seq)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    uint16_t win = seq / p_sys->i_fecsize;
+    uint16_t iseq = seq % p_sys->i_fecsize;
+    uint16_t row = iseq / p_sys->i_cols;
+    uint16_t col = iseq % p_sys->i_cols;
+
+    if (iseq == 0) {
+        reset_fec(p_access);
+    }
+
+    /* Update row fec */
+    fec_xor(p_access->p_sys->fec_rows[row], p_access->p_sys->fec_rows[row], 
p_buffer);
+    /* Last in row? Enqueue. */
+    if (col == (p_sys->i_cols - 1)) {
+        block_t *fec = p_access->p_sys->fec_rows[row];
+        struct dozer_hdr *hdr = (struct dozer_hdr *)fec->p_buffer;
+        hdr->signature = htons(SIGNATURE);
+        hdr->type = htons(FEC_TYPE_FEC_ROW);
+        hdr->win = htons(win);
+        hdr->seq = htons(row);
+        hdr->count = htons(p_sys->i_rows);
+        fec->i_dts = p_buffer->i_dts;
+        return fec;
+    }
+    return NULL;
+}
+
+static block_t *process_fec_col(sout_access_out_t *p_access, block_t 
*p_buffer, uint64_t seq)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    uint16_t win = seq / p_sys->i_fecsize;
+    uint16_t iseq = seq % p_sys->i_fecsize;
+    uint16_t row = iseq / p_sys->i_cols;
+    uint16_t col = iseq % p_sys->i_cols;
+
+    /* Update col fec */
+    fec_xor(p_access->p_sys->fec_cols[col], p_access->p_sys->fec_cols[col], 
p_buffer);
+    /* Last in row? Enqueue. */
+    if (row == (p_sys->i_rows - 1)) {
+        block_t *fec = p_access->p_sys->fec_cols[col];
+        struct dozer_hdr *hdr = (struct dozer_hdr *)fec->p_buffer;
+        hdr->signature = htons(SIGNATURE);
+        hdr->type = htons(FEC_TYPE_FEC_COL);
+        hdr->win = htons(win);
+        hdr->seq = htons(col);
+        hdr->count = htons(p_sys->i_cols);
+        fec->i_dts = p_buffer->i_dts;
+        return fec;
+    }
+    return NULL;
+}
+
+static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    int i_len = 0;
+
+    while( p_buffer )
+    {
+        block_t *p_next;
+        int i_packets = 0;
+        mtime_t now = mdate();
+        struct dozer_hdr *hdr;
+        block_t *fec_c, *fec_r;
+
+        if( !p_sys->b_mtu_warning && p_buffer->i_buffer > p_sys->i_mtu )
+        {
+            msg_Warn( p_access, "packet size > MTU, you should probably "
+                      "increase the MTU" );
+            p_sys->b_mtu_warning = true;
+        }
+
+        /* Check if there is enough space in the buffer */
+        if( p_sys->p_buffer &&
+            p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
+        {
+            if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
+            {
+                msg_Dbg( p_access, "late packet for UDP input (%"PRId64 ")",
+                         now - p_sys->p_buffer->i_dts
+                          - p_sys->i_caching );
+            }
+            /* Populate header */
+            hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
+            hdr->size = htons(p_sys->p_buffer->i_buffer - sizeof(struct 
dozer_hdr));
+            fec_r = process_fec_row(p_access, p_sys->p_buffer, i_seq - 1);
+            fec_c = process_fec_col(p_access, p_sys->p_buffer, i_seq - 1);
+            block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
+            p_sys->p_buffer = NULL;
+            if (fec_r)
+                block_FifoPut(p_sys->p_fifo, fec_r); 
+            if (fec_c)
+                block_FifoPut(p_sys->p_fifo, fec_c); 
+        }
+
+        i_len += p_buffer->i_buffer;
+        while( p_buffer->i_buffer )
+        {
+            size_t i_payload_size = p_sys->i_mtu;
+            size_t i_write = __MIN( p_buffer->i_buffer, i_payload_size );
+
+            i_packets++;
+
+            if( !p_sys->p_buffer )
+            {
+                p_sys->p_buffer = NewUDPPacket( p_access, p_buffer->i_dts );
+                if( !p_sys->p_buffer ) break;
+
+                /* Populate header */
+                hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
+                hdr->signature = htons(SIGNATURE);
+                hdr->type = htons(FEC_TYPE_DATA);
+                hdr->seq = htons(i_seq % p_sys->i_fecsize);
+                hdr->win = htons((i_seq / p_sys->i_fecsize) & 0xFFFF);
+                i_seq++;
+                p_sys->p_buffer->i_buffer += sizeof(struct dozer_hdr);
+            } else
+                hdr = (struct dozer_hdr *)(p_sys->p_buffer->p_buffer);
+
+            memcpy( p_sys->p_buffer->p_buffer + p_sys->p_buffer->i_buffer,
+                    p_buffer->p_buffer, i_write );
+
+            p_sys->p_buffer->i_buffer += i_write;
+            p_buffer->p_buffer += i_write;
+            p_buffer->i_buffer -= i_write;
+            if ( p_buffer->i_flags & BLOCK_FLAG_CLOCK )
+            {
+                if ( p_sys->p_buffer->i_flags & BLOCK_FLAG_CLOCK )
+                    msg_Warn( p_access, "putting two PCRs at once" );
+                p_sys->p_buffer->i_flags |= BLOCK_FLAG_CLOCK;
+            }
+
+            if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
+            {
+                /* Flush */
+                if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
+                {
+                    msg_Dbg( p_access, "late packet for udp input (%"PRId64 
")",
+                             mdate() - p_sys->p_buffer->i_dts
+                              - p_sys->i_caching );
+                }
+                /* populate packet size */
+                hdr->size = htons(p_sys->p_buffer->i_buffer - sizeof(struct 
dozer_hdr));
+                fec_r = process_fec_row(p_access, p_sys->p_buffer, i_seq - 1);
+                fec_c = process_fec_col(p_access, p_sys->p_buffer, i_seq - 1);
+                block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
+                p_sys->p_buffer = NULL;
+                if (fec_r)
+                    block_FifoPut(p_sys->p_fifo, fec_r); 
+                if (fec_c)
+                    block_FifoPut(p_sys->p_fifo, fec_c); 
+            }
+        }
+
+        p_next = p_buffer->p_next;
+        block_Release( p_buffer );
+        p_buffer = p_next;
+    }
+    return i_len;
+}
+
+/*****************************************************************************
+ * Seek: seek to a specific location in a file
+ 
*****************************************************************************/
+static int Seek( sout_access_out_t *p_access, off_t i_pos )
+{
+    (void) i_pos;
+    msg_Err( p_access, "UDP sout access cannot seek" );
+    return -1;
+}
+
+/*****************************************************************************
+ * NewUDPPacket: allocate a new UDP packet of size p_sys->i_mtu
+ 
*****************************************************************************/
+static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
+{
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    block_t *p_buffer;
+
+
+    while ( block_FifoCount( p_sys->p_empty_blocks ) > MAX_EMPTY_BLOCKS )
+    {
+        p_buffer = block_FifoGet( p_sys->p_empty_blocks );
+        block_Release( p_buffer );
+    }
+
+    if( block_FifoCount( p_sys->p_empty_blocks ) == 0 )
+    {
+        p_buffer = block_Alloc( p_sys->i_mtu + sizeof(struct dozer_hdr));
+    }
+    else
+    {
+        p_buffer = block_FifoGet(p_sys->p_empty_blocks );
+        p_buffer->i_flags = 0;
+        p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu + sizeof(struct 
dozer_hdr));
+    }
+
+    p_buffer->i_dts = i_dts;
+    p_buffer->i_buffer = 0;
+
+    return p_buffer;
+}
+
+/*****************************************************************************
+ * ThreadWrite: Write a packet on the network at the good time.
+ 
*****************************************************************************/
+static void* ThreadWrite( void *data )
+{
+    sout_access_out_t *p_access = data;
+    sout_access_out_sys_t *p_sys = p_access->p_sys;
+    mtime_t i_date_last = -1;
+    const unsigned i_group = var_GetInteger( p_access,
+                                             SOUT_CFG_PREFIX "group" );
+    mtime_t i_to_send = i_group;
+    unsigned i_dropped_packets = 0;
+    msg_Dbg( p_access, "Dozer: Sending thread started.");
+
+    for (;;)
+    {
+        block_t *p_pk = block_FifoGet( p_sys->p_fifo );
+        mtime_t       i_date, i_sent;
+
+        i_date = p_sys->i_caching + p_pk->i_dts;
+        if( i_date_last > 0 )
+        {
+            if( i_date - i_date_last > 2000000 )
+            {
+                if( !i_dropped_packets )
+                    msg_Dbg( p_access, "mmh, hole (%"PRId64" > 2s) -> drop",
+                             i_date - i_date_last );
+
+                block_FifoPut( p_sys->p_empty_blocks, p_pk );
+
+                i_date_last = i_date;
+                i_dropped_packets++;
+                continue;
+            }
+            else if( i_date - i_date_last < -1000 )
+            {
+                if( !i_dropped_packets )
+                    msg_Dbg( p_access, "mmh, packets in the past 
(%"PRId64")",
+                             i_date_last - i_date );
+            }
+        }
+
+        block_cleanup_push( p_pk );
+        i_to_send--;
+        if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
+        {
+            mwait( i_date );
+            i_to_send = i_group;
+        }
+        if ( send( p_sys->i_handle, p_pk->p_buffer, p_pk->i_buffer, 0 ) == -1 )
+            msg_Warn( p_access, "send error: %s", vlc_strerror_c(errno) );
+        vlc_cleanup_pop();
+
+        if( i_dropped_packets )
+        {
+            msg_Dbg( p_access, "dropped %i packets", i_dropped_packets );
+            i_dropped_packets = 0;
+        }
+
+#if 1
+        i_sent = mdate();
+        if ( i_sent > i_date + 20000 )
+        {
+            msg_Dbg( p_access, "packet has been sent too late (%"PRId64 ")",
+                     i_sent - i_date );
+        }
+#endif
+
+        block_FifoPut( p_sys->p_empty_blocks, p_pk );
+
+        i_date_last = i_date;
+    }
+    return NULL;
+}
-- 
2.8.1

-- 
Rémi Denis-Courmont
http://www.remlab.net/



More information about the vlc-devel mailing list