[multicat-devel] [PATCH] Allow seamless reads and DTS/PTS manipulation

Dr. Net! - Eugen Rieck eugen at drnet.at
Thu Nov 17 16:02:30 CET 2016


Dear multicat devs,

first let me introduce me: My name is Eugen, I am from Austria, Vienna. Multicat is in use with a TS playout box to DVB heads, for which we needed to patch multicat, now hoping to have the tested results upstreamed.


The more important first part deals with an issue, where multicat doesn't behave as expected when reading from a FIFO while the sending side is just posting a packet: This concurrency can result in multicat reading just a part of a block and then complaining "warning: invalid TS packet (sync=0x..)"

To reproduce the issue, run something like

mkfifo playout.ts
multicat -u -U -C -P playout.ts /dev/null

and in another console

(sk=0 ; while true; do dd if=some-valid-file.ts bs=5000 count=1 skip=$sk ; sk=$(($sk+1)) ; sleep 0.5 ; done) > playout.ts

I introduced a -M flag (to complement -m), that will make stream_Read loop until a complete payload chunk has been read. This completely fixed the issue for us, i.e. running "multicat -M -u -U -C -P playout.ts /dev/null" works as expected.


The less important second part adds the -o (for DTS) and -O (for PTS) flags for usage with restamping (-P). They allow offsets for DTS/PTS to acommodate e.g. slow DVB heads.

All the best,

Eugen


--- multicat.ori.c    2016-11-17 14:43:27.088402240 +0100
+++ multicat.c    2016-11-17 14:51:57.726030179 +0100
@@ -79,6 +79,7 @@
 static in_addr_t i_ssrc = 0;
 static bool b_input_udp = false, b_output_udp = false;
 static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE;
+static bool b_full_blocks = false;
 static size_t i_rtp_header_size = RTP_HEADER_SIZE;
 static uint64_t i_rotate_size = DEFAULT_ROTATE_SIZE;
 static uint64_t i_duration = 0;
@@ -89,6 +90,8 @@
 static uint64_t i_last_pcr_date;
 static uint64_t i_last_pcr = TS_CLOCK_MAX;
 static uint64_t i_pcr_offset;
+static uint64_t i_dts_offset;
+static uint64_t i_pts_offset;
 
 static volatile sig_atomic_t b_die = 0, b_error = 0;
 static uint16_t i_rtp_seqnum;
@@ -105,7 +108,7 @@
 
 static void usage(void)
 {
-    msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-l <syslogtag>] [-t <ttl>] [-X] [-T <file name>] [-f] [-p <PCR PID>] [-C] [-P] [-s <chunks>] [-n <chunks>] [-k <start time>] [-d <duration>] [-a] [-r <file duration>] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] [-R <RTP header size>] [-w] <input item> <output item>" );
+    msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-l <syslogtag>] [-t <ttl>] [-X] [-T <file name>] [-f] [-p <PCR PID>] [-C] [-P] [-o <dtsoffset>] [-O <ptsoffset>] [-s <chunks>] [-n <chunks>] [-k <start time>] [-d <duration>] [-a] [-r <file duration>] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] [-R <RTP header size>] [-w] <input item> <output item>" );
     msg_Raw( NULL, "    item format: <file path | device path | FIFO path | directory path | network host>" );
     msg_Raw( NULL, "    host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" );
     msg_Raw( NULL, "    -X: also pass-through all packets to stdout" );
@@ -114,6 +117,8 @@
     msg_Raw( NULL, "    -p: overwrite or create RTP timestamps using PCR PID (MPEG-2/TS)" );
     msg_Raw( NULL, "    -C: rewrite continuity counters to be continuous" );
     msg_Raw( NULL, "    -P: restamp PCRs, DTSs, and PTSs" );
+    msg_Raw( NULL, "    -o: When restamping, add this offset to DTSs (in 27 MHz units)" );
+    msg_Raw( NULL, "    -O: When restamping, add this offset to PTSs (in 27 MHz units)" );
     msg_Raw( NULL, "    -s: skip the first N chunks of payload [deprecated]" );
     msg_Raw( NULL, "    -n: exit after playing N chunks of payload [deprecated]" );
     msg_Raw( NULL, "    -k: start at the given position (in 27 MHz units, negative = from the end)" );
@@ -124,6 +129,7 @@
     msg_Raw( NULL, "    -u: source has no RTP header" );
     msg_Raw( NULL, "    -U: destination has no RTP header" );
     msg_Raw( NULL, "    -m: size of the payload chunk, excluding optional RTP header (default 1316)" );
+    msg_Raw( NULL, "    -M: If input is a FIFO or a character device, read only complete payload chunks" );
     msg_Raw( NULL, "    -R: size of the optional RTP header (default 12)" );
     msg_Raw( NULL, "    -w: send with RAW (needed for /srcaddr)" );
     exit(EXIT_FAILURE);
@@ -361,18 +367,27 @@
     if ( !i_stream_nb_skips && !i_first_stc )
         i_first_stc = pf_Date();
 
-    if ( !Poll() )
-    {
-        i_stc = pf_Date();
-        return 0;
-    }
+    ssize_t i_done = 0;
+    i_ret = 0;
+    while ( i_done < i_len ) {
 
-    if ( (i_ret = read( i_input_fd, p_buf, i_len )) < 0 )
-    {
-        msg_Err( NULL, "read error (%s)", strerror(errno) );
-        b_die = b_error = 1;
-        return 0;
+        if ( !Poll() )
+        {
+            i_stc = pf_Date();
+            return 0;
+        }
+
+        if ( (i_ret = read( i_input_fd, p_buf+i_done, i_len-i_done )) < 0 )
+        {
+            msg_Err( NULL, "read error (%s)", strerror(errno) );
+            b_die = b_error = 1;
+            return 0;
+        }
+        if ( i_ret == 0 ) break;
+        i_done += i_ret;
+        if ( !b_full_blocks ) break;
     }
+    i_ret = i_done;
 
     i_stc = pf_Date();
     if ( i_stream_nb_skips )
@@ -893,14 +908,14 @@
                 /* disable the check as this is a common mistake */
                 /* && pes_validate_pts(p_buffer + header_size) */) {
                 pes_set_pts(p_buffer + header_size,
-                        RestampTS(pes_get_pts(p_buffer + header_size) * 300) /
+                        RestampTS((pes_get_pts(p_buffer + header_size) + i_pts_offset) * 300) /
                         300);
 
                 if (header_size + PES_HEADER_SIZE_PTSDTS <= TS_SIZE &&
                     pes_has_dts(p_buffer + header_size)
                     /* && pes_validate_dts(p_buffer + header_size) */)
                     pes_set_dts(p_buffer + header_size,
-                        RestampTS(pes_get_dts(p_buffer + header_size) * 300) /
+                        RestampTS((pes_get_dts(p_buffer + header_size) + i_dts_offset) * 300) /
                         300);
             }
         }
@@ -927,9 +942,11 @@
     int c;
     struct sigaction sa;
     sigset_t set;
+    i_dts_offset=0;
+    i_pts_offset=0;
 
     /* Parse options */
-    while ( (c = getopt( i_argc, pp_argv, "i:l:t:XT:fp:CPs:n:k:d:ar:S:uUm:R:wh" )) != -1 )
+    while ( (c = getopt( i_argc, pp_argv, "i:l:t:XT:fp:CPo:O:s:n:k:d:ar:S:uUm:MR:wh" )) != -1 )
     {
         switch ( c )
         {
@@ -973,6 +990,14 @@
             b_restamp = true;
             break;
 
+        case 'o':
+            i_dts_offset = strtoll( optarg, NULL, 0 );
+            break;
+
+        case 'O':
+            i_pts_offset = strtoll( optarg, NULL, 0 );
+            break;
+
         case 's':
             i_skip_chunks = strtol( optarg, NULL, 0 );
             break;
@@ -1019,6 +1044,10 @@
             i_asked_payload_size = strtol( optarg, NULL, 0 );
             break;
 
+        case 'M':
+            b_full_blocks = true;
+            break;
+
         case 'R':
             i_rtp_header_size = strtol( optarg, NULL, 0 );
             break;


-- 

:-) Eugen Rieck (-:
  eugen at drnet.at

The greatest test of an engineer is not his technical ingenuity 
but his ability to persuade those in power who do not want to be persuaded
and convince those for whom the evidence of their own eyes is anything but convincing.



More information about the multicat-devel mailing list