[x265] [PATCH] threading: introduce ThreadSafeInteger class
Steve Borho
steve at borho.org
Mon Mar 17 21:28:04 CET 2014
# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1395088047 18000
# Mon Mar 17 15:27:27 2014 -0500
# Node ID 921f6642623d77895fdd6e5f2ea27188eee2ee5f
# Parent 8d5deb7cafd83ac554489deb8577f905a0cda6b3
threading: introduce ThreadSafeInteger class
This class uses a condition variable to implement a producer/consumer access
protocol with a single writer and multiple readers for safe multi-core
synchronization
diff -r 8d5deb7cafd8 -r 921f6642623d source/Lib/TLibCommon/TComPic.cpp
--- a/source/Lib/TLibCommon/TComPic.cpp Mon Mar 17 00:47:24 2014 -0500
+++ b/source/Lib/TLibCommon/TComPic.cpp Mon Mar 17 15:27:27 2014 -0500
@@ -66,7 +66,7 @@
, m_cuCostsForVbv(NULL)
, m_intraCuCostsForVbv(NULL)
{
- m_reconRowCount = 0;
+ m_reconRowCount.set(0);
m_countRefEncoders = 0;
memset(&m_lowres, 0, sizeof(m_lowres));
m_next = NULL;
diff -r 8d5deb7cafd8 -r 921f6642623d source/Lib/TLibCommon/TComPic.h
--- a/source/Lib/TLibCommon/TComPic.h Mon Mar 17 00:47:24 2014 -0500
+++ b/source/Lib/TLibCommon/TComPic.h Mon Mar 17 15:27:27 2014 -0500
@@ -77,7 +77,7 @@
public:
//** Frame Parallelism - notification between FrameEncoders of available motion reference rows **
- volatile uint32_t m_reconRowCount; // count of CTU rows completely reconstructed and extended for motion reference
+ ThreadSafeInteger m_reconRowCount; // count of CTU rows completely reconstructed and extended for motion reference
volatile uint32_t m_countRefEncoders; // count of FrameEncoder threads monitoring m_reconRowCount
void* m_userData; // user provided pointer passed in with this picture
diff -r 8d5deb7cafd8 -r 921f6642623d source/common/threading.h
--- a/source/common/threading.h Mon Mar 17 00:47:24 2014 -0500
+++ b/source/common/threading.h Mon Mar 17 15:27:27 2014 -0500
@@ -204,6 +204,60 @@
HANDLE handle;
};
+/* This class is intended for use in signaling state changes safely between CPU
+ * cores. One thread should be a writer and multiple threads may be readers. The
+ * mutex's main purpose is to serve as a memory fence to ensure writes made by
+ * the writer thread are visible prior to readers seeing the m_val change. Its
+ * secondary purpose is for use with the condition variable for blocking waits */
+class ThreadSafeInteger
+{
+public:
+
+ ThreadSafeInteger()
+ {
+ m_val = 0;
+ InitializeCriticalSection(&m_cs);
+ InitializeConditionVariable(&m_cv);
+ }
+
+ ~ThreadSafeInteger()
+ {
+ DeleteCriticalSection(&m_cs)
+ DeleteConditionVariable(&m_cv);
+ }
+
+ int waitForChange(int prev)
+ {
+ EnterCriticalSection(&m_cs);
+ if (m_val == prev)
+ SleepConditionVariableCS(&m_cs, &m_cv, INFINITE);
+ LeaveCriticalSection(&m_cs);
+ return m_val;
+ }
+
+ int get() const
+ {
+ EnterCriticalSection(&m_cs);
+ int ret = m_val;
+ LeaveCriticalSection(&m_cs);
+ return ret;
+ }
+
+ void set(int newval)
+ {
+ EnterCriticalSection(&m_cs);
+ m_val = newval;
+ WakeAllConditionVariable(&m_cv);
+ LeaveCriticalSection(&m_cs);
+ }
+
+protected:
+
+ CRITICAL_SECTION m_cs;
+ CONDITION_VARIABLE m_cv;
+ int m_val;
+};
+
#else /* POSIX / pthreads */
typedef pthread_t ThreadHandle;
@@ -314,6 +368,64 @@
uint32_t m_counter;
};
+
+/* This class is intended for use in signaling state changes safely between CPU
+ * cores. One thread should be a writer and multiple threads may be readers. The
+ * mutex's main purpose is to serve as a memory fence to ensure writes made by
+ * the writer thread are visible prior to readers seeing the m_val change. Its
+ * secondary purpose is for use with the condition variable for blocking waits */
+class ThreadSafeInteger
+{
+public:
+
+ ThreadSafeInteger()
+ {
+ m_val = 0;
+ if (pthread_mutex_init(&m_mutex, NULL) ||
+ pthread_cond_init(&m_cond, NULL))
+ {
+ x265_log(NULL, X265_LOG_ERROR, "fatal: unable to initialize conditional variable\n");
+ }
+ }
+
+ ~ThreadSafeInteger()
+ {
+ pthread_cond_destroy(&m_cond);
+ pthread_mutex_destroy(&m_mutex);
+ }
+
+ int waitForChange(int prev)
+ {
+ pthread_mutex_lock(&m_mutex);
+ if (m_val == prev)
+ pthread_cond_wait(&m_cond, &m_mutex);
+ pthread_mutex_unlock(&m_mutex);
+ return m_val;
+ }
+
+ int get()
+ {
+ pthread_mutex_lock(&m_mutex);
+ int ret = m_val;
+ pthread_mutex_unlock(&m_mutex);
+ return ret;
+ }
+
+ void set(int newval)
+ {
+ pthread_mutex_lock(&m_mutex);
+ m_val = newval;
+ pthread_cond_broadcast(&m_cond);
+ pthread_mutex_unlock(&m_mutex);
+ }
+
+protected:
+
+ pthread_mutex_t m_mutex;
+ pthread_cond_t m_cond;
+ int m_val;
+};
+
#endif // ifdef _WIN32
class ScopedLock
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/dpb.cpp
--- a/source/encoder/dpb.cpp Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/dpb.cpp Mon Mar 17 15:27:27 2014 -0500
@@ -52,7 +52,7 @@
iterPic = iterPic->m_next;
if (pic->getSlice()->isReferenced() == false && pic->m_countRefEncoders == 0)
{
- pic->m_reconRowCount = 0;
+ pic->m_reconRowCount.set(0);
pic->m_bChromaPlanesExtended = false;
// iterator is invalidated by remove, restart scan
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/encoder.cpp
--- a/source/encoder/encoder.cpp Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/encoder.cpp Mon Mar 17 15:27:27 2014 -0500
@@ -187,20 +187,6 @@
m_encodeStartTime = x265_mdate();
}
-/* Called when a frame encoder has completed a CTU row of reconstructed
- * pixels and extended them; making them available for use as reference.
- * It calls this function with its POC, so the top encoder may waken any
- * other frame encoders that were waiting for a row of recon pixels from
- * this picture */
-void Encoder::signalReconRowCompleted(int poc)
-{
- for (int i = 0; i < param->frameNumThreads; i++)
- {
- if (m_frameEncoder[i].m_blockRefPOC == poc)
- m_frameEncoder[i].m_reconRowWait.trigger();
- }
-}
-
int Encoder::getStreamHeaders(NALUnitEBSP **nalunits)
{
return m_frameEncoder->getStreamHeaders(nalunits);
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/frameencoder.cpp
--- a/source/encoder/frameencoder.cpp Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/frameencoder.cpp Mon Mar 17 15:27:27 2014 -0500
@@ -55,7 +55,6 @@
m_nalList[i] = NULL;
}
- m_blockRefPOC = -1;
m_nalCount = 0;
m_totalTime = 0;
memset(&m_rce, 0, sizeof(RateControlEntry));
@@ -698,7 +697,6 @@
ATOMIC_DEC(&refpic->m_countRefEncoders);
}
}
- m_top->signalReconRowCompleted(m_pic->getPOC());
m_pic->m_elapsedCompressTime = (double)(x265_mdate() - startCompressTime) / 1000000;
delete[] outStreams;
@@ -915,7 +913,7 @@
range += 1; /* diamond search range check lag */
range += 2; /* subpel refine */
range += NTAPS_LUMA / 2; /* subpel filter half-length */
- uint32_t refLagRows = 1 + ((range + g_maxCUSize - 1) / g_maxCUSize);
+ int refLagRows = 1 + ((range + g_maxCUSize - 1) / g_maxCUSize);
int numPredDir = slice->isInterP() ? 1 : slice->isInterB() ? 2 : 0;
m_pic->m_SSDY = 0;
@@ -929,7 +927,7 @@
WaveFront::clearEnabledRowMask();
WaveFront::enqueue();
- for (uint32_t row = 0; row < (uint32_t)m_numRows; row++)
+ for (int row = 0; row < m_numRows; row++)
{
// block until all reference frames have reconstructed the rows we need
for (int l = 0; l < numPredDir; l++)
@@ -938,16 +936,9 @@
{
TComPic *refpic = slice->getRefPic(l, ref);
- /* indicate which reference picture we might wait for,
- * prior to checking recon row count */
- m_blockRefPOC = refpic->getPOC();
- while ((refpic->m_reconRowCount != (uint32_t)m_numRows) &&
- (refpic->m_reconRowCount < row + refLagRows))
- {
- m_reconRowWait.wait();
- }
-
- m_blockRefPOC = -1;
+ int reconRowCount = refpic->m_reconRowCount.get();
+ while ((reconRowCount != m_numRows) && (reconRowCount < row + refLagRows))
+ reconRowCount = refpic->m_reconRowCount.waitForChange(reconRowCount);
if (slice->getPPS()->getUseWP() && slice->getSliceType() == P_SLICE && m_mref[l][ref].isWeighted)
{
@@ -982,16 +973,9 @@
{
TComPic *refpic = slice->getRefPic(list, ref);
- /* indicate which reference picture we might wait for,
- * prior to checking recon row count */
- m_blockRefPOC = refpic->getPOC();
- while ((refpic->m_reconRowCount != (uint32_t)m_numRows) &&
- (refpic->m_reconRowCount < i + refLagRows))
- {
- m_reconRowWait.wait();
- }
-
- m_blockRefPOC = -1;
+ int reconRowCount = refpic->m_reconRowCount.get();
+ while ((reconRowCount != m_numRows) && (reconRowCount < i + refLagRows))
+ reconRowCount = refpic->m_reconRowCount.waitForChange(reconRowCount);
if (slice->getPPS()->getUseWP() && slice->getSliceType() == P_SLICE && m_mref[l][ref].isWeighted)
{
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/frameencoder.h
--- a/source/encoder/frameencoder.h Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/frameencoder.h Mon Mar 17 15:27:27 2014 -0500
@@ -112,10 +112,6 @@
}
}
- volatile int m_blockRefPOC;
-
- Event m_reconRowWait;
-
TEncEntropy* getEntropyCoder(int row) { return &this->m_rows[row].m_entropyCoder; }
TEncSbac* getSbacCoder(int row) { return &this->m_rows[row].m_sbacCoder; }
diff -r 8d5deb7cafd8 -r 921f6642623d source/encoder/framefilter.cpp
--- a/source/encoder/framefilter.cpp Mon Mar 17 00:47:24 2014 -0500
+++ b/source/encoder/framefilter.cpp Mon Mar 17 15:27:27 2014 -0500
@@ -263,8 +263,7 @@
}
// Notify other FrameEncoders that this row of reconstructed pixels is available
- m_pic->m_reconRowCount++;
- cfg->signalReconRowCompleted(m_pic->getPOC());
+ m_pic->m_reconRowCount.set(m_pic->m_reconRowCount.get() + 1);
int cuAddr = lineStartCUAddr;
if (m_param->bEnablePsnr)
More information about the x265-devel
mailing list