[x265] [PATCH] threading: introduce ThreadSafeInteger class

Steve Borho steve at borho.org
Mon Mar 17 21:28:04 CET 2014


# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1395088047 18000
#      Mon Mar 17 15:27:27 2014 -0500
# Node ID 921f6642623d77895fdd6e5f2ea27188eee2ee5f
# Parent  8d5deb7cafd83ac554489deb8577f905a0cda6b3
threading: introduce ThreadSafeInteger class

This class uses a condition variable to implement a producer/consumer access
protocol with a single writer and multiple readers for safe multi-core
synchronization

diff -r 8d5deb7cafd8 -r 921f6642623d source/Lib/TLibCommon/TComPic.cpp
--- a/source/Lib/TLibCommon/TComPic.cpp	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/Lib/TLibCommon/TComPic.cpp	Mon Mar 17 15:27:27 2014 -0500
@@ -66,7 +66,7 @@
     , m_cuCostsForVbv(NULL)
     , m_intraCuCostsForVbv(NULL)
 {
-    m_reconRowCount = 0;
+    m_reconRowCount.set(0);
     m_countRefEncoders = 0;
     memset(&m_lowres, 0, sizeof(m_lowres));
     m_next = NULL;
diff -r 8d5deb7cafd8 -r 921f6642623d source/Lib/TLibCommon/TComPic.h
--- a/source/Lib/TLibCommon/TComPic.h	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/Lib/TLibCommon/TComPic.h	Mon Mar 17 15:27:27 2014 -0500
@@ -77,7 +77,7 @@
 public:
 
     //** Frame Parallelism - notification between FrameEncoders of available motion reference rows **
-    volatile uint32_t     m_reconRowCount;      // count of CTU rows completely reconstructed and extended for motion reference
+    ThreadSafeInteger     m_reconRowCount;      // count of CTU rows completely reconstructed and extended for motion reference
     volatile uint32_t     m_countRefEncoders;   // count of FrameEncoder threads monitoring m_reconRowCount
     void*                 m_userData;           // user provided pointer passed in with this picture
 
diff -r 8d5deb7cafd8 -r 921f6642623d source/common/threading.h
--- a/source/common/threading.h	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/common/threading.h	Mon Mar 17 15:27:27 2014 -0500
@@ -204,6 +204,60 @@
     HANDLE handle;
 };
 
+/* This class is intended for use in signaling state changes safely between CPU
+ * cores. One thread should be a writer and multiple threads may be readers. The
+ * mutex's main purpose is to serve as a memory fence to ensure writes made by
+ * the writer thread are visible prior to readers seeing the m_val change. Its
+ * secondary purpose is for use with the condition variable for blocking waits */
+class ThreadSafeInteger
+{
+public:
+
+    ThreadSafeInteger()
+    {
+        m_val = 0;
+        InitializeCriticalSection(&m_cs);
+        InitializeConditionVariable(&m_cv);
+    }
+
+    ~ThreadSafeInteger()
+    {
+        DeleteCriticalSection(&m_cs)
+        DeleteConditionVariable(&m_cv);
+    }
+
+    int waitForChange(int prev)
+    {
+        EnterCriticalSection(&m_cs);
+        if (m_val == prev)
+            SleepConditionVariableCS(&m_cs, &m_cv, INFINITE);
+        LeaveCriticalSection(&m_cs);
+        return m_val;
+    }
+
+    int get() const
+    {
+        EnterCriticalSection(&m_cs);
+        int ret = m_val;
+        LeaveCriticalSection(&m_cs);
+        return ret;
+    }
+
+    void set(int newval)
+    {
+        EnterCriticalSection(&m_cs);
+        m_val = newval;
+        WakeAllConditionVariable(&m_cv);
+        LeaveCriticalSection(&m_cs);
+    }
+
+protected:
+
+    CRITICAL_SECTION   m_cs;
+    CONDITION_VARIABLE m_cv;
+    int                m_val;
+};
+
 #else /* POSIX / pthreads */
 
 typedef pthread_t ThreadHandle;
@@ -314,6 +368,64 @@
     uint32_t        m_counter;
 };
 
+
+/* This class is intended for use in signaling state changes safely between CPU
+ * cores. One thread should be a writer and multiple threads may be readers. The
+ * mutex's main purpose is to serve as a memory fence to ensure writes made by
+ * the writer thread are visible prior to readers seeing the m_val change. Its
+ * secondary purpose is for use with the condition variable for blocking waits */
+class ThreadSafeInteger
+{
+public:
+
+    ThreadSafeInteger()
+    {
+        m_val = 0;
+        if (pthread_mutex_init(&m_mutex, NULL) ||
+            pthread_cond_init(&m_cond, NULL))
+        {
+            x265_log(NULL, X265_LOG_ERROR, "fatal: unable to initialize conditional variable\n");
+        }
+    }
+
+    ~ThreadSafeInteger()
+    {
+        pthread_cond_destroy(&m_cond);
+        pthread_mutex_destroy(&m_mutex);
+    }
+
+    int waitForChange(int prev)
+    {
+        pthread_mutex_lock(&m_mutex);
+        if (m_val == prev)
+            pthread_cond_wait(&m_cond, &m_mutex);
+        pthread_mutex_unlock(&m_mutex);
+        return m_val;
+    }
+
+    int get()
+    {
+        pthread_mutex_lock(&m_mutex);
+        int ret = m_val;
+        pthread_mutex_unlock(&m_mutex);
+        return ret;
+    }
+
+    void set(int newval)
+    {
+        pthread_mutex_lock(&m_mutex);
+        m_val = newval;
+        pthread_cond_broadcast(&m_cond);
+        pthread_mutex_unlock(&m_mutex);
+    }
+
+protected:
+
+    pthread_mutex_t m_mutex;
+    pthread_cond_t  m_cond;
+    int             m_val;
+};
+
 #endif // ifdef _WIN32
 
 class ScopedLock
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/dpb.cpp
--- a/source/encoder/dpb.cpp	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/dpb.cpp	Mon Mar 17 15:27:27 2014 -0500
@@ -52,7 +52,7 @@
         iterPic = iterPic->m_next;
         if (pic->getSlice()->isReferenced() == false && pic->m_countRefEncoders == 0)
         {
-            pic->m_reconRowCount = 0;
+            pic->m_reconRowCount.set(0);
             pic->m_bChromaPlanesExtended = false;
 
             // iterator is invalidated by remove, restart scan
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/encoder.cpp
--- a/source/encoder/encoder.cpp	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/encoder.cpp	Mon Mar 17 15:27:27 2014 -0500
@@ -187,20 +187,6 @@
     m_encodeStartTime = x265_mdate();
 }
 
-/* Called when a frame encoder has completed a CTU row of reconstructed
- * pixels and extended them; making them available for use as reference.
- * It calls this function with its POC, so the top encoder may waken any
- * other frame encoders that were waiting for a row of recon pixels from
- * this picture */
-void Encoder::signalReconRowCompleted(int poc)
-{
-    for (int i = 0; i < param->frameNumThreads; i++)
-    {
-        if (m_frameEncoder[i].m_blockRefPOC == poc)
-            m_frameEncoder[i].m_reconRowWait.trigger();
-    }
-}
-
 int Encoder::getStreamHeaders(NALUnitEBSP **nalunits)
 {
     return m_frameEncoder->getStreamHeaders(nalunits);
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/frameencoder.cpp
--- a/source/encoder/frameencoder.cpp	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/frameencoder.cpp	Mon Mar 17 15:27:27 2014 -0500
@@ -55,7 +55,6 @@
         m_nalList[i] = NULL;
     }
 
-    m_blockRefPOC = -1;
     m_nalCount = 0;
     m_totalTime = 0;
     memset(&m_rce, 0, sizeof(RateControlEntry));
@@ -698,7 +697,6 @@
             ATOMIC_DEC(&refpic->m_countRefEncoders);
         }
     }
-    m_top->signalReconRowCompleted(m_pic->getPOC());
 
     m_pic->m_elapsedCompressTime = (double)(x265_mdate() - startCompressTime) / 1000000;
     delete[] outStreams;
@@ -915,7 +913,7 @@
     range    += 1;                        /* diamond search range check lag */
     range    += 2;                        /* subpel refine */
     range    += NTAPS_LUMA / 2;           /* subpel filter half-length */
-    uint32_t refLagRows = 1 + ((range + g_maxCUSize - 1) / g_maxCUSize);
+    int refLagRows = 1 + ((range + g_maxCUSize - 1) / g_maxCUSize);
     int numPredDir = slice->isInterP() ? 1 : slice->isInterB() ? 2 : 0;
 
     m_pic->m_SSDY = 0;
@@ -929,7 +927,7 @@
         WaveFront::clearEnabledRowMask();
         WaveFront::enqueue();
 
-        for (uint32_t row = 0; row < (uint32_t)m_numRows; row++)
+        for (int row = 0; row < m_numRows; row++)
         {
             // block until all reference frames have reconstructed the rows we need
             for (int l = 0; l < numPredDir; l++)
@@ -938,16 +936,9 @@
                 {
                     TComPic *refpic = slice->getRefPic(l, ref);
 
-                    /* indicate which reference picture we might wait for,
-                     * prior to checking recon row count */
-                    m_blockRefPOC = refpic->getPOC();
-                    while ((refpic->m_reconRowCount != (uint32_t)m_numRows) &&
-                           (refpic->m_reconRowCount < row + refLagRows))
-                    {
-                        m_reconRowWait.wait();
-                    }
-
-                    m_blockRefPOC = -1;
+                    int reconRowCount = refpic->m_reconRowCount.get();
+                    while ((reconRowCount != m_numRows) && (reconRowCount < row + refLagRows))
+                        reconRowCount = refpic->m_reconRowCount.waitForChange(reconRowCount);
 
                     if (slice->getPPS()->getUseWP() && slice->getSliceType() == P_SLICE && m_mref[l][ref].isWeighted)
                     {
@@ -982,16 +973,9 @@
                     {
                         TComPic *refpic = slice->getRefPic(list, ref);
 
-                        /* indicate which reference picture we might wait for,
-                         * prior to checking recon row count */
-                        m_blockRefPOC = refpic->getPOC();
-                        while ((refpic->m_reconRowCount != (uint32_t)m_numRows) &&
-                               (refpic->m_reconRowCount < i + refLagRows))
-                        {
-                            m_reconRowWait.wait();
-                        }
-
-                        m_blockRefPOC = -1;
+                        int reconRowCount = refpic->m_reconRowCount.get();
+                        while ((reconRowCount != m_numRows) && (reconRowCount < i + refLagRows))
+                            reconRowCount = refpic->m_reconRowCount.waitForChange(reconRowCount);
 
                         if (slice->getPPS()->getUseWP() && slice->getSliceType() == P_SLICE && m_mref[l][ref].isWeighted)
                         {
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/frameencoder.h
--- a/source/encoder/frameencoder.h	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/frameencoder.h	Mon Mar 17 15:27:27 2014 -0500
@@ -112,10 +112,6 @@
         }
     }
 
-    volatile int m_blockRefPOC;
-
-    Event        m_reconRowWait;
-
     TEncEntropy* getEntropyCoder(int row)      { return &this->m_rows[row].m_entropyCoder; }
 
     TEncSbac*    getSbacCoder(int row)         { return &this->m_rows[row].m_sbacCoder; }
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/framefilter.cpp
--- a/source/encoder/framefilter.cpp	Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/framefilter.cpp	Mon Mar 17 15:27:27 2014 -0500
@@ -263,8 +263,7 @@
     }
 
     // Notify other FrameEncoders that this row of reconstructed pixels is available
-    m_pic->m_reconRowCount++;
-    cfg->signalReconRowCompleted(m_pic->getPOC());
+    m_pic->m_reconRowCount.set(m_pic->m_reconRowCount.get() + 1);
 
     int cuAddr = lineStartCUAddr;
     if (m_param->bEnablePsnr)


More information about the x265-devel mailing list