[x265] [PATCH] input: streamline control logic of threaded file readers

Steve Borho steve at borho.org
Wed Jul 2 01:38:50 CEST 2014


# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1404254848 18000
#      Tue Jul 01 17:47:28 2014 -0500
# Node ID 0a86d7a81008bc7344a0c6052d67bfae55dd6edf
# Parent  a18972fd05b1d6242a881bef979b9e1ff17543d9
input: streamline control logic of threaded file readers

These files were written before ThreadSafeInteger and this caused the control
logic to be over-complicated.  Now they can be greatly simplified and their
control flows can be re-unified to be more like each other.

Also, drop PPA events. File reading is pretty uninteresting when profiling.

diff -r a18972fd05b1 -r 0a86d7a81008 source/input/y4m.cpp
--- a/source/input/y4m.cpp	Tue Jul 01 14:58:35 2014 -0500
+++ b/source/input/y4m.cpp	Tue Jul 01 17:47:28 2014 -0500
@@ -22,7 +22,6 @@
  *****************************************************************************/
 
 #include "y4m.h"
-#include "PPA/ppa.h"
 #include "common.h"
 
 #include <iostream>
@@ -44,14 +43,11 @@
 
 Y4MInput::Y4MInput(InputFileInfo& info)
 {
-    for (uint32_t i = 0; i < QUEUE_SIZE; i++)
-    {
-        plane[i][2] = plane[i][1] = plane[i][0] = NULL;
-        frameStat[i] = false;
-    }
+    for (int i = 0; i < QUEUE_SIZE; i++)
+        buf[i] = NULL;
 
-    head.set(0);
-    tail.set(0);
+    readCount.set(0);
+    writeCount.set(0);
 
     threadActive = false;
     colorSpace = info.csp;
@@ -62,6 +58,7 @@
     rateNum = info.fpsNum;
     rateDenom = info.fpsDenom;
     depth = info.depth;
+    framesize = 0;
 
     ifs = NULL;
     if (!strcmp(info.filename, "-"))
@@ -74,29 +71,24 @@
     else
         ifs = new ifstream(info.filename, ios::binary | ios::in);
 
-    uint32_t bytesPerPixel = 1;
-    size_t frameSize = strlen(header) + 1;
     if (ifs && ifs->good() && parseHeader())
     {
-        bytesPerPixel = depth > 8 ? 2 : 1;
+        int pixelbytes = depth > 8 ? 2 : 1;
         for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
         {
-            plane_stride[i] = (uint32_t)(width >> x265_cli_csps[colorSpace].width[i]) * bytesPerPixel;
-            plane_size[i] =   (uint32_t)(plane_stride[i] * (height >> x265_cli_csps[colorSpace].height[i]));
-            frameSize += plane_size[i];
+            int stride = (width >> x265_cli_csps[colorSpace].width[i]) * pixelbytes;
+            framesize += (stride * (height >> x265_cli_csps[colorSpace].height[i]));
         }
+
         threadActive = true;
-        for (uint32_t q = 0; q < QUEUE_SIZE && threadActive; q++)
+        for (int q = 0; q < QUEUE_SIZE; q++)
         {
-            for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
+            buf[q] = X265_MALLOC(char, framesize);
+            if (!buf[q])
             {
-                plane[q][i] = X265_MALLOC(char, plane_size[i]);
-                if (!plane[q][i])
-                {
-                    x265_log(NULL, X265_LOG_ERROR, "y4m: buffer allocation failure, aborting");
-                    threadActive = false;
-                    break;
-                }
+                x265_log(NULL, X265_LOG_ERROR, "y4m: buffer allocation failure, aborting");
+                threadActive = false;
+                break;
             }
         }
     }
@@ -118,6 +110,8 @@
     info.depth = depth;
     info.frameCount = -1;
 
+    int estFrameSize = framesize + strlen(header) + 1; /* assume basic FRAME\n headers */
+
     /* try to estimate frame count, if this is not stdin */
     if (ifs != &cin)
     {
@@ -132,7 +126,7 @@
         {
             LARGE_INTEGER size;
             if (GetFileSizeEx(hFile, &size))
-                info.frameCount = (int)((size.QuadPart - (int64_t)cur) / frameSize);
+                info.frameCount = (int)((size.QuadPart - (int64_t)cur) / estFrameSize);
             CloseHandle(hFile);
         }
 #else // if defined(_MSC_VER) && _MSC_VER < 1700
@@ -142,7 +136,7 @@
             istream::pos_type size = ifs->tellg();
             ifs->seekg(cur, ios::beg);
             if (size > 0)
-                info.frameCount = (int)((size - cur) / frameSize);
+                info.frameCount = (int)((size - cur) / estFrameSize);
         }
 #endif // if defined(_MSC_VER) && _MSC_VER < 1700
     }
@@ -150,12 +144,10 @@
     if (info.skipFrames)
     {
 #if X86_64
-        ifs->seekg((uint64_t)frameSize * info.skipFrames, ios::cur);
+        ifs->seekg((uint64_t)estFrameSize * info.skipFrames, ios::cur);
 #else
         for (int i = 0; i < info.skipFrames; i++)
-        {
-            ifs->ignore(frameSize);
-        }
+            ifs->ignore(estFrameSize);
 #endif
     }
 }
@@ -165,13 +157,16 @@
     if (ifs && ifs != &cin)
         delete ifs;
 
-    for (uint32_t i = 0; i < QUEUE_SIZE; i++)
-    {
-        for (int j = 0; j < x265_cli_csps[colorSpace].planes; j++)
-        {
-            x265_free(plane[i][j]);
-        }
-    }
+    for (int i = 0; i < QUEUE_SIZE; i++)
+        X265_FREE(buf[i]);
+}
+
+void Y4MInput::release()
+{
+    threadActive = false;
+    readCount.set(readCount.get()); // unblock file reader
+    stop();
+    delete this;
 }
 
 bool Y4MInput::parseHeader()
@@ -387,26 +382,63 @@
     }
     while (threadActive);
 
-    /* "open the throttle" at the end, allow reader to consume
-     * remaining valid queue entries */
     threadActive = false;
-    tail.set(QUEUE_SIZE);
+    writeCount.set(writeCount.get()); // unblock readPicture
+}
+
+bool Y4MInput::populateFrameQueue()
+{
+    if (!ifs || ifs->fail())
+        return false;
+
+    /* strip off the FRAME header */
+    char hbuf[sizeof(header)];
+
+    ifs->read(hbuf, strlen(header));
+    if (ifs->eof())
+        return false;
+
+    if (!ifs->good() || memcmp(hbuf, header, strlen(header)))
+    {
+        x265_log(NULL, X265_LOG_ERROR, "y4m: frame header missing\n");
+        return false;
+    }
+
+    /* consume bytes up to line feed */
+    int c = ifs->get();
+    while (c != '\n' && ifs->good())
+        c = ifs->get();
+
+    /* wait for room in the ring buffer */
+    int written = writeCount.get();
+    int read = readCount.get();
+    while (written - read > QUEUE_SIZE - 2)
+    {
+        read = readCount.waitForChange(read);
+        if (!threadActive)
+            return false;
+    }
+
+    ifs->read(buf[written % QUEUE_SIZE], framesize);
+    if (ifs->good())
+    {
+        writeCount.incr();
+        return true;
+    }
+    else
+        return false;
 }
 
 bool Y4MInput::readPicture(x265_picture& pic)
 {
-    PPAStartCpuEventFunc(read_yuv);
-    int curHead = head.get();
-    int curTail = tail.get();
+    int read = readCount.get();
+    int written = writeCount.get();
 
 #if ENABLE_THREADING
 
-    while (curHead == curTail)
-    {
-        curTail = tail.waitForChange(curTail);
-        if (!threadActive)
-            break;
-    }
+    /* only wait if the read thread is still active */
+    while (threadActive && read == written)
+        written = writeCount.waitForChange(written);
 
 #else
 
@@ -414,71 +446,21 @@
 
 #endif // if ENABLE_THREADING
 
-    if (!frameStat[curHead])
+    if (read < written)
+    {
+        int pixelbytes = depth > 8 ? 2 : 1;
+        pic.bitDepth = depth;
+        pic.colorSpace = colorSpace;
+        pic.stride[0] = width * pixelbytes;
+        pic.stride[1] = pic.stride[0] >> x265_cli_csps[colorSpace].width[1];
+        pic.stride[2] = pic.stride[0] >> x265_cli_csps[colorSpace].width[2];
+        pic.planes[0] = buf[read % QUEUE_SIZE];
+        pic.planes[1] = (char*)pic.planes[0] + pic.stride[0] * height;
+        pic.planes[2] = (char*)pic.planes[1] + pic.stride[1] * (height >> x265_cli_csps[colorSpace].height[1]);
+        readCount.incr();
+        return true;
+    }
+    else
         return false;
-    frameStat[curHead] = false;
-
-    pic.bitDepth = depth;
-    pic.colorSpace = colorSpace;
-    for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
-    {
-        pic.planes[i] = plane[curHead][i];
-        pic.stride[i] = plane_stride[i];
-    }
-
-    head.set((curHead + 1) % QUEUE_SIZE);
-
-    PPAStopCpuEventFunc(read_yuv);
-    return true;
 }
 
-bool Y4MInput::populateFrameQueue()
-{
-    /* strip off the FRAME header */
-    char hbuf[sizeof(header)];
-
-    if (!ifs)
-        return false;
-
-    ifs->read(hbuf, strlen(header));
-    if (!ifs->good())
-        return false;
-
-    if (memcmp(hbuf, header, strlen(header)))
-    {
-        x265_log(NULL, X265_LOG_ERROR, "y4m: frame header missing\n");
-        return false;
-    }
-    /* consume bytes up to line feed */
-    int c = ifs->get();
-    while (c != '\n' && ifs->good())
-    {
-        c = ifs->get();
-    }
-
-    int curTail = tail.get();
-    int curHead = head.get();
-    while ((curTail + 1) % QUEUE_SIZE == curHead)
-    {
-        curHead = head.waitForChange(curHead);
-        if (!threadActive)
-            return false;
-    }
-
-    for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
-    {
-        ifs->read(plane[curTail][i], plane_size[i]);
-    }
-
-    frameStat[curTail] = !ifs->fail();
-    tail.set((curTail + 1) % QUEUE_SIZE);
-    return !ifs->fail();
-}
-
-void Y4MInput::release()
-{
-    threadActive = false;
-    head.set(QUEUE_SIZE); // unblock file reader
-    stop();
-    delete this;
-}
diff -r a18972fd05b1 -r 0a86d7a81008 source/input/y4m.h
--- a/source/input/y4m.h	Tue Jul 01 14:58:35 2014 -0500
+++ b/source/input/y4m.h	Tue Jul 01 17:47:28 2014 -0500
@@ -45,6 +45,8 @@
 
     uint32_t sarHeight;
 
+    size_t framesize;
+
     int depth;
 
     int width;
@@ -53,19 +55,13 @@
 
     int colorSpace;
 
-    uint32_t plane_size[3];
-
-    uint32_t plane_stride[3];
-
     bool threadActive;
 
-    ThreadSafeInteger head;
+    ThreadSafeInteger readCount;
 
-    ThreadSafeInteger tail;
+    ThreadSafeInteger writeCount;
 
-    bool frameStat[QUEUE_SIZE];
-
-    char* plane[QUEUE_SIZE][3];
+    char* buf[QUEUE_SIZE];
 
     std::istream *ifs;
 
diff -r a18972fd05b1 -r 0a86d7a81008 source/input/yuv.cpp
--- a/source/input/yuv.cpp	Tue Jul 01 14:58:35 2014 -0500
+++ b/source/input/yuv.cpp	Tue Jul 01 17:47:28 2014 -0500
@@ -22,7 +22,6 @@
  *****************************************************************************/
 
 #include "yuv.h"
-#include "PPA/ppa.h"
 #include "common.h"
 
 #include <iostream>
@@ -43,14 +42,10 @@
 YUVInput::YUVInput(InputFileInfo& info)
 {
     for (int i = 0; i < QUEUE_SIZE; i++)
-    {
         buf[i] = NULL;
-        frameStat[i] = false;
-    }
 
-    head.set(0);
-    tail.set(0);
-    framesize = 0;
+    readCount.set(0);
+    writeCount.set(0);
     depth = info.depth;
     width = info.width;
     height = info.height;
@@ -58,6 +53,15 @@
     threadActive = false;
     ifs = NULL;
 
+    uint32_t pixelbytes = depth > 8 ? 2 : 1;
+    framesize = 0;
+    for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
+    {
+        uint32_t w = width >> x265_cli_csps[colorSpace].width[i];
+        uint32_t h = height >> x265_cli_csps[colorSpace].height[i];
+        framesize += w * h * pixelbytes;
+    }
+
     if (width == 0 || height == 0 || info.fpsNum == 0 || info.fpsDenom == 0)
     {
         x265_log(NULL, X265_LOG_ERROR, "yuv: width, height, and FPS must be specified\n");
@@ -84,17 +88,9 @@
         return;
     }
 
-    pixelbytes = depth > 8 ? 2 : 1;
-    for (int i = 0; i < x265_cli_csps[colorSpace].planes; i++)
-    {
-        uint32_t w = width >> x265_cli_csps[colorSpace].width[i];
-        uint32_t h = height >> x265_cli_csps[colorSpace].height[i];
-        framesize += w * h * pixelbytes;
-    }
-
     for (uint32_t i = 0; i < QUEUE_SIZE; i++)
     {
-        buf[i] = new char[framesize];
+        buf[i] = X265_MALLOC(char, framesize);
         if (buf[i] == NULL)
         {
             x265_log(NULL, X265_LOG_ERROR, "yuv: buffer allocation failure, aborting\n");
@@ -140,9 +136,7 @@
         ifs->seekg((uint64_t)framesize * info.skipFrames, ios::cur);
 #else
         for (int i = 0; i < info.skipFrames; i++)
-        {
             ifs->ignore(framesize);
-        }
 #endif
     }
 }
@@ -152,15 +146,13 @@
     if (ifs && ifs != &cin)
         delete ifs;
     for (int i = 0; i < QUEUE_SIZE; i++)
-    {
-        delete[] buf[i];
-    }
+        X265_FREE(buf[i]);
 }
 
 void YUVInput::release()
 {
     threadActive = false;
-    head.set(QUEUE_SIZE);
+    readCount.set(readCount.get()); // unblock read thread
     stop();
     delete this;
 }
@@ -168,7 +160,7 @@
 void YUVInput::startReader()
 {
 #if ENABLE_THREADING
-    if (ifs && threadActive)
+    if (threadActive)
         start();
 #endif
 }
@@ -182,43 +174,45 @@
     }
 
     threadActive = false;
-    tail.set(QUEUE_SIZE);
+    writeCount.set(writeCount.get()); // unblock readPicture
 }
 
 bool YUVInput::populateFrameQueue()
 {
-    int curTail = tail.get();
-    int curHead = head.get();
+    if (!ifs || ifs->fail())
+        return false;
 
-    while ((curTail + 1) % QUEUE_SIZE == curHead)
+    /* wait for room in the ring buffer */
+    int written = writeCount.get();
+    int read = readCount.get();
+    while (written - read > QUEUE_SIZE - 2)
     {
-        curHead = head.waitForChange(curHead);
+        read = readCount.waitForChange(read);
         if (!threadActive)
+            // release() has been called
             return false;
     }
 
-    PPAStartCpuEventFunc(read_yuv);
-    ifs->read(buf[curTail], framesize);
-    frameStat[curTail] = !ifs->fail();
-    tail.set((curTail + 1) % QUEUE_SIZE);
-    PPAStopCpuEventFunc(read_yuv);
-
-    return !ifs->fail();
+    ifs->read(buf[written % QUEUE_SIZE], framesize);
+    if (ifs->good())
+    {
+        writeCount.incr();
+        return true;
+    }
+    else
+        return false;
 }
 
 bool YUVInput::readPicture(x265_picture& pic)
 {
-    int curHead = head.get();
-    int curTail = tail.get();
+    int read = readCount.get();
+    int written = writeCount.get();
 
 #if ENABLE_THREADING
 
-    while (curHead == curTail)
-    {
-        curTail = tail.waitForChange(curTail);
-        if (!threadActive)
-            break;
-    }
+    /* only wait if the read thread is still active */
+    while (threadActive && read == written)
+        written = writeCount.waitForChange(written);
 
 #else
 
@@ -226,20 +220,20 @@
 
 #endif // if ENABLE_THREADING
 
-    if (!frameStat[curHead])
+    if (read < written)
+    {
+        uint32_t pixelbytes = depth > 8 ? 2 : 1;
+        pic.colorSpace = colorSpace;
+        pic.bitDepth = depth;
+        pic.stride[0] = width * pixelbytes;
+        pic.stride[1] = pic.stride[0] >> x265_cli_csps[colorSpace].width[1];
+        pic.stride[2] = pic.stride[0] >> x265_cli_csps[colorSpace].width[2];
+        pic.planes[0] = buf[read % QUEUE_SIZE];
+        pic.planes[1] = (char*)pic.planes[0] + pic.stride[0] * height;
+        pic.planes[2] = (char*)pic.planes[1] + pic.stride[1] * (height >> x265_cli_csps[colorSpace].height[1]);
+        readCount.incr();
+        return true;
+    }
+    else
         return false;
-    frameStat[curHead] = false;
-
-    pic.colorSpace = colorSpace;
-    pic.bitDepth = depth;
-    pic.stride[0] = width * pixelbytes;
-    pic.stride[1] = pic.stride[0] >> x265_cli_csps[colorSpace].width[1];
-    pic.stride[2] = pic.stride[0] >> x265_cli_csps[colorSpace].width[2];
-    pic.planes[0] = buf[curHead];
-    pic.planes[1] = (char*)pic.planes[0] + pic.stride[0] * height;
-    pic.planes[2] = (char*)pic.planes[1] + pic.stride[1] * (height >> x265_cli_csps[colorSpace].height[1]);
-
-    head.set((curHead + 1) % QUEUE_SIZE);
-
-    return true;
 }
diff -r a18972fd05b1 -r 0a86d7a81008 source/input/yuv.h
--- a/source/input/yuv.h	Tue Jul 01 14:58:35 2014 -0500
+++ b/source/input/yuv.h	Tue Jul 01 17:47:28 2014 -0500
@@ -45,17 +45,13 @@
 
     uint32_t depth;
 
-    uint32_t pixelbytes;
-
     uint32_t framesize;
 
     bool threadActive;
 
-    ThreadSafeInteger head;
+    ThreadSafeInteger readCount;
 
-    ThreadSafeInteger tail;
-
-    bool frameStat[QUEUE_SIZE];
+    ThreadSafeInteger writeCount;
 
     char* buf[QUEUE_SIZE];
 


More information about the x265-devel mailing list