[x265] [PATCH 2 of 3] pool: introduce bonded task groups

Steve Borho steve at borho.org
Thu Feb 19 20:40:43 CET 2015


# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1424360388 21600
#      Thu Feb 19 09:39:48 2015 -0600
# Node ID bc969aedef71552dfd6a914d99543f3ac4eb16e3
# Parent  4e72e207f86574e43dc9c63bc4ddb6fb61cfbb7e
pool: introduce bonded task groups

Since the thread pools no longer support JobProviders being dynamically added
and removed we need a new mechanism to enlist the help of worker threads for
short-term work.

A bonded task group is a simple data structure, stack allocated, which tracks
the workers which were enlisted for help and when they are all finished. It
will try to enlist threads which most recently worked on the same frame, if
available, then it will try any idle threads in the same pool.

This commit switches --pmode and --pme to use bonded task groups.

diff -r 4e72e207f865 -r bc969aedef71 doc/reST/threading.rst
--- a/doc/reST/threading.rst	Thu Feb 19 09:45:59 2015 -0600
+++ b/doc/reST/threading.rst	Thu Feb 19 09:39:48 2015 -0600
@@ -75,24 +75,35 @@
 thread count to be higher than if WPP was enabled.  The exact formulas
 are described in the next section.
 
+Bonded Task Groups
+==================
+
+If a worker thread job has work which can be performed in parallel by
+many threads, it may allocate a bonded task group and enlist the help of
+other idle worker threads in the same pool. Those threads will cooperate
+to complete the work of the bonded task group and then return to their
+idle states. The larger and more uniform those tasks are, the better the
+bonded task group will perform.
+
 Parallel Mode Analysis
-======================
+~~~~~~~~~~~~~~~~~~~~~~
 
 When :option:`--pmode` is enabled, each CU (at all depths from 64x64 to
-8x8) will distribute its analysis work to the thread pool. Each analysis
-job will measure the cost of one prediction for the CU: merge, skip,
-intra, inter (2Nx2N, Nx2N, 2NxN, and AMP). At slower presets, the amount
-of increased parallelism is often enough to be able to reduce frame
-parallelism while achieving the same overall CPU utilization. Reducing
-frame threads is often beneficial to ABR and VBV rate control.
+8x8) will distribute its analysis work to the thread pool via a bonded
+task group. Each analysis job will measure the cost of one prediction
+for the CU: merge, skip, intra, inter (2Nx2N, Nx2N, 2NxN, and AMP). At
+slower presets, the amount of increased parallelism is often enough to
+be able to reduce frame parallelism while achieving the same overall CPU
+utilization. Reducing frame threads is often beneficial to ABR and VBV
+rate control.
 
 Parallel Motion Estimation
-==========================
+~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 When :option:`--pme` is enabled all of the analysis functions which
 perform motion searches to reference frames will distribute those motion
-searches as jobs for worker threads (if more than two motion searches
-are required).
+searches as jobs for worker threads via a bonded task group (if more
+than two motion searches are required).
 
 Frame Threading
 ===============
diff -r 4e72e207f865 -r bc969aedef71 source/common/threadpool.h
--- a/source/common/threadpool.h	Thu Feb 19 09:45:59 2015 -0600
+++ b/source/common/threadpool.h	Thu Feb 19 09:39:48 2015 -0600
@@ -104,6 +104,68 @@
     static int  getNumaNodeCount();
     static void setThreadNodeAffinity(int node);
 };
+
+/* Any worker thread may enlist the help of idle worker threads from the same
+ * job provider. They must derive from this class and implement the
+ * processTasks() method.  To use, an instance must be instantiated by a worker
+ * thread (referred to as the master thread) and then tryBondPeers() must be
+ * called. If it returns non-zero then some number of slave worker threads are
+ * already in the process of calling your processTasks() function. The master
+ * thread should participate and call processTasks() itself. When
+ * waitForExit() returns, all bonded peer threads are quarunteed to have
+ * exitied processTasks(). Since the thread count is small, it uses explicit
+ * locking instead of atomic counters and bitmasks */
+class BondedTaskGroup
+{
+public:
+
+    Lock              m_lock;
+    ThreadSafeInteger m_exitedPeerCount;
+    int               m_bondedPeerCount;
+    int               m_jobTotal;
+    int               m_jobAcquired;
+
+    BondedTaskGroup()  { m_bondedPeerCount = m_jobTotal = m_jobAcquired = 0; }
+
+    /* Do not allow the instance to be destroyed before all bonded peers have
+     * exited processTasks() */
+    ~BondedTaskGroup() { waitForExit(); }
+
+    /* Try to enlist the help of idle worker threads on most recently associated
+     * with the given job provider and "bond" them to work on your tasks. Up to
+     * maxPeers worker threads will call your processTasks() method. */
+    int tryBondPeers(JobProvider& jp, int maxPeers)
+    {
+        int count = jp.m_pool->tryBondPeers(maxPeers, jp.m_ownerBitmap, *this);
+        m_bondedPeerCount += count;
+        return count;
+    }
+
+    /* Try to enlist the help of any idle worker threads and "bond" them to work
+     * on your tasks. Up to maxPeers worker threads will call your
+     * processTasks() method. */
+    int tryBondPeers(ThreadPool& pool, int maxPeers)
+    {
+        int count = pool.tryBondPeers(maxPeers, ALL_POOL_THREADS, *this);
+        m_bondedPeerCount += count;
+        return count;
+    }
+
+    /* Returns when all bonded peers have exited processTasks(). It does *NOT*
+     * ensure all tasks are completed (but this is generally implied). */
+    void waitForExit()
+    {
+        int exited = m_exitedPeerCount.get();
+        while (m_bondedPeerCount != exited)
+            exited = m_exitedPeerCount.waitForChange(exited);
+    }
+
+    /* Derived classes must define this method. The worker thread ID may be
+     * used to index into thread local data, or ignored.  The ID will be between
+     * 0 and jp.m_numWorkers - 1 */
+    virtual void processTasks(int workerThreadId) = 0;
+};
+
 } // end namespace x265
 
 #endif // ifndef X265_THREADPOOL_H
diff -r 4e72e207f865 -r bc969aedef71 source/encoder/analysis.cpp
--- a/source/encoder/analysis.cpp	Thu Feb 19 09:45:59 2015 -0600
+++ b/source/encoder/analysis.cpp	Thu Feb 19 09:39:48 2015 -0600
@@ -71,7 +71,6 @@
 
 Analysis::Analysis()
 {
-    m_totalNumJobs = m_numAcquiredJobs = m_numCompletedJobs = 0;
     m_reuseIntraDataCTU = NULL;
     m_reuseInterDataCTU = NULL;
 }
@@ -324,198 +323,158 @@
         md.bestMode->reconYuv.copyToPicYuv(*m_frame->m_reconPic, parentCTU.m_cuAddr, cuGeom.encodeIdx);
 }
 
-bool Analysis::findJob(int threadId)
+void Analysis::PMODE::processTasks(int workerThreadId)
 {
-    /* try to acquire a CU mode to analyze */
-    m_pmodeLock.acquire();
-    if (m_totalNumJobs > m_numAcquiredJobs)
-    {
-        int id = m_numAcquiredJobs++;
-        m_pmodeLock.release();
-
-        {
-            ProfileScopeEvent(pmode);
-            ProfileCUScope(m_modeDepth[m_curGeom->depth].pred[PRED_2Nx2N].cu, pmodeTime, countPModeTasks);
-            parallelModeAnalysis(threadId, id);
-        }
-
-        m_pmodeLock.acquire();
-        if (++m_numCompletedJobs == m_totalNumJobs)
-            m_modeCompletionEvent.trigger();
-        m_pmodeLock.release();
-        return true;
-    }
-    else
-        m_pmodeLock.release();
-
-    m_meLock.acquire();
-    if (m_totalNumME > m_numAcquiredME)
-    {
-        int id = m_numAcquiredME++;
-        m_meLock.release();
-
-        {
-            ProfileScopeEvent(pme);
-            ProfileCUScope(m_curInterMode->cu, pmeTime, countPMETasks);
-            parallelME(threadId, id);
-        }
-
-        m_meLock.acquire();
-        if (++m_numCompletedME == m_totalNumME)
-            m_meCompletionEvent.trigger();
-        m_meLock.release();
-        return true;
-    }
-    else
-        m_meLock.release();
-
-    return false;
+    ProfileScopeEvent(pmode);
+    master.processPmode(*this, master.m_tld[workerThreadId].analysis);
 }
 
-void Analysis::parallelME(int threadId, int meId)
+/* process pmode jobs until none remain; may be called by the master thread or by
+ * a bonded peer (slave) thread via pmodeTasks() */
+void Analysis::processPmode(PMODE& pmode, Analysis& slave)
 {
-    Analysis* slave;
-
-    if (threadId == -1)
-        slave = this;
-    else
+    /* acquire a mode task, else exit early */
+    int task;
+    pmode.m_lock.acquire();
+    if (pmode.m_jobTotal > pmode.m_jobAcquired)
     {
-        slave = &m_tld[threadId].analysis;
-        slave->setQP(*m_slice, m_rdCost.m_qp);
-        slave->m_slice = m_slice;
-        slave->m_frame = m_frame;
-
-        slave->m_me.setSourcePU(*m_curInterMode->fencYuv, m_curInterMode->cu.m_cuAddr, m_curGeom->encodeIdx, m_puAbsPartIdx, m_puWidth, m_puHeight);
-        slave->prepMotionCompensation(m_curInterMode->cu, *m_curGeom, m_curPart);
-    }
-
-    if (meId < m_slice->m_numRefIdx[0])
-        slave->singleMotionEstimation(*this, *m_curInterMode, *m_curGeom, m_curPart, 0, meId);
-    else
-        slave->singleMotionEstimation(*this, *m_curInterMode, *m_curGeom, m_curPart, 1, meId - m_slice->m_numRefIdx[0]);
-}
-
-void Analysis::parallelModeAnalysis(int threadId, int jobId)
-{
-    Analysis* slave;
-
-    if (threadId == -1)
-        slave = this;
-    else
-    {
-        slave = &m_tld[threadId].analysis;
-        slave->m_slice = m_slice;
-        slave->m_frame = m_frame;
-        slave->setQP(*m_slice, m_rdCost.m_qp);
-        slave->invalidateContexts(0);
-    }
-
-    ModeDepth& md = m_modeDepth[m_curGeom->depth];
-
-    if (m_param->rdLevel <= 4)
-    {
-        switch (jobId)
-        {
-        case 0:
-            if (slave != this)
-                slave->m_rqt[m_curGeom->depth].cur.load(m_rqt[m_curGeom->depth].cur);
-            slave->checkIntraInInter(md.pred[PRED_INTRA], *m_curGeom);
-            if (m_param->rdLevel > 2)
-                slave->encodeIntraInInter(md.pred[PRED_INTRA], *m_curGeom);
-            break;
-
-        case 1:
-            slave->checkInter_rd0_4(md.pred[PRED_2Nx2N], *m_curGeom, SIZE_2Nx2N);
-            if (m_slice->m_sliceType == B_SLICE)
-                slave->checkBidir2Nx2N(md.pred[PRED_2Nx2N], md.pred[PRED_BIDIR], *m_curGeom);
-            break;
-
-        case 2:
-            slave->checkInter_rd0_4(md.pred[PRED_Nx2N], *m_curGeom, SIZE_Nx2N);
-            break;
-
-        case 3:
-            slave->checkInter_rd0_4(md.pred[PRED_2NxN], *m_curGeom, SIZE_2NxN);
-            break;
-
-        case 4:
-            slave->checkInter_rd0_4(md.pred[PRED_2NxnU], *m_curGeom, SIZE_2NxnU);
-            break;
-
-        case 5:
-            slave->checkInter_rd0_4(md.pred[PRED_2NxnD], *m_curGeom, SIZE_2NxnD);
-            break;
-
-        case 6:
-            slave->checkInter_rd0_4(md.pred[PRED_nLx2N], *m_curGeom, SIZE_nLx2N);
-            break;
-
-        case 7:
-            slave->checkInter_rd0_4(md.pred[PRED_nRx2N], *m_curGeom, SIZE_nRx2N);
-            break;
-
-        default:
-            X265_CHECK(0, "invalid job ID for parallel mode analysis\n");
-            break;
-        }
+        task = pmode.m_jobAcquired++;
+        pmode.m_lock.release();
     }
     else
     {
-        bool bMergeOnly = m_curGeom->log2CUSize == 6;
-        if (slave != this)
+        pmode.m_lock.release();
+        return;
+    }
+
+    ModeDepth& md = m_modeDepth[pmode.cuGeom.depth];
+    bool bMergeOnly = pmode.cuGeom.log2CUSize == 6;
+
+    ProfileCUScope(md.pred[PRED_2Nx2N].cu, pmodeTime, countPModeTasks);
+
+    /* setup slave Analysis */
+    if (&slave != this)
+    {
+        slave.m_slice = m_slice;
+        slave.m_frame = m_frame;
+        slave.setQP(*m_slice, m_rdCost.m_qp);
+        slave.invalidateContexts(0);
+
+        if (m_param->rdLevel >= 5)
         {
-            slave->m_rqt[m_curGeom->depth].cur.load(m_rqt[m_curGeom->depth].cur);
-            slave->m_quant.setQPforQuant(md.pred[PRED_2Nx2N].cu);
-        }
-        
-        switch (jobId)
-        {
-        case 0:
-            slave->checkIntra(md.pred[PRED_INTRA], *m_curGeom, SIZE_2Nx2N, NULL, NULL);
-            if (m_curGeom->log2CUSize == 3 && m_slice->m_sps->quadtreeTULog2MinSize < 3)
-                slave->checkIntra(md.pred[PRED_INTRA_NxN], *m_curGeom, SIZE_NxN, NULL, NULL);
-            break;
-
-        case 1:
-            slave->checkInter_rd5_6(md.pred[PRED_2Nx2N], *m_curGeom, SIZE_2Nx2N, false);
-            md.pred[PRED_BIDIR].rdCost = MAX_INT64;
-            if (m_slice->m_sliceType == B_SLICE)
-            {
-                slave->checkBidir2Nx2N(md.pred[PRED_2Nx2N], md.pred[PRED_BIDIR], *m_curGeom);
-                if (md.pred[PRED_BIDIR].sa8dCost < MAX_INT64)
-                    slave->encodeResAndCalcRdInterCU(md.pred[PRED_BIDIR], *m_curGeom);
-            }
-            break;
-
-        case 2:
-            slave->checkInter_rd5_6(md.pred[PRED_Nx2N], *m_curGeom, SIZE_Nx2N, false);
-            break;
-
-        case 3:
-            slave->checkInter_rd5_6(md.pred[PRED_2NxN], *m_curGeom, SIZE_2NxN, false);
-            break;
-
-        case 4:
-            slave->checkInter_rd5_6(md.pred[PRED_2NxnU], *m_curGeom, SIZE_2NxnU, bMergeOnly);
-            break;
-
-        case 5:
-            slave->checkInter_rd5_6(md.pred[PRED_2NxnD], *m_curGeom, SIZE_2NxnD, bMergeOnly);
-            break;
-
-        case 6:
-            slave->checkInter_rd5_6(md.pred[PRED_nLx2N], *m_curGeom, SIZE_nLx2N, bMergeOnly);
-            break;
-
-        case 7:
-            slave->checkInter_rd5_6(md.pred[PRED_nRx2N], *m_curGeom, SIZE_nRx2N, bMergeOnly);
-            break;
-
-        default:
-            X265_CHECK(0, "invalid job ID for parallel mode analysis\n");
-            break;
+            slave.m_rqt[pmode.cuGeom.depth].cur.load(m_rqt[pmode.cuGeom.depth].cur);
+            slave.m_quant.setQPforQuant(md.pred[PRED_2Nx2N].cu);
         }
     }
+
+    /* perform Mode task, repeat until no more work is available */
+    do
+    {
+        if (m_param->rdLevel <= 4)
+        {
+            switch (pmode.modes[task])
+            {
+            case PRED_INTRA:
+                if (&slave != this)
+                    slave.m_rqt[pmode.cuGeom.depth].cur.load(m_rqt[pmode.cuGeom.depth].cur);
+                slave.checkIntraInInter(md.pred[PRED_INTRA], pmode.cuGeom);
+                if (m_param->rdLevel > 2)
+                    slave.encodeIntraInInter(md.pred[PRED_INTRA], pmode.cuGeom);
+                break;
+
+            case PRED_2Nx2N:
+                slave.checkInter_rd0_4(md.pred[PRED_2Nx2N], pmode.cuGeom, SIZE_2Nx2N);
+                if (m_slice->m_sliceType == B_SLICE)
+                    slave.checkBidir2Nx2N(md.pred[PRED_2Nx2N], md.pred[PRED_BIDIR], pmode.cuGeom);
+                break;
+
+            case PRED_Nx2N:
+                slave.checkInter_rd0_4(md.pred[PRED_Nx2N], pmode.cuGeom, SIZE_Nx2N);
+                break;
+
+            case PRED_2NxN:
+                slave.checkInter_rd0_4(md.pred[PRED_2NxN], pmode.cuGeom, SIZE_2NxN);
+                break;
+
+            case PRED_2NxnU:
+                slave.checkInter_rd0_4(md.pred[PRED_2NxnU], pmode.cuGeom, SIZE_2NxnU);
+                break;
+
+            case PRED_2NxnD:
+                slave.checkInter_rd0_4(md.pred[PRED_2NxnD], pmode.cuGeom, SIZE_2NxnD);
+                break;
+
+            case PRED_nLx2N:
+                slave.checkInter_rd0_4(md.pred[PRED_nLx2N], pmode.cuGeom, SIZE_nLx2N);
+                break;
+
+            case PRED_nRx2N:
+                slave.checkInter_rd0_4(md.pred[PRED_nRx2N], pmode.cuGeom, SIZE_nRx2N);
+                break;
+
+            default:
+                X265_CHECK(0, "invalid job ID for parallel mode analysis\n");
+                break;
+            }
+        }
+        else
+        {
+            switch (pmode.modes[task])
+            {
+            case PRED_INTRA:
+                slave.checkIntra(md.pred[PRED_INTRA], pmode.cuGeom, SIZE_2Nx2N, NULL, NULL);
+                if (pmode.cuGeom.log2CUSize == 3 && m_slice->m_sps->quadtreeTULog2MinSize < 3)
+                    slave.checkIntra(md.pred[PRED_INTRA_NxN], pmode.cuGeom, SIZE_NxN, NULL, NULL);
+                break;
+
+            case PRED_2Nx2N:
+                slave.checkInter_rd5_6(md.pred[PRED_2Nx2N], pmode.cuGeom, SIZE_2Nx2N, false);
+                md.pred[PRED_BIDIR].rdCost = MAX_INT64;
+                if (m_slice->m_sliceType == B_SLICE)
+                {
+                    slave.checkBidir2Nx2N(md.pred[PRED_2Nx2N], md.pred[PRED_BIDIR], pmode.cuGeom);
+                    if (md.pred[PRED_BIDIR].sa8dCost < MAX_INT64)
+                        slave.encodeResAndCalcRdInterCU(md.pred[PRED_BIDIR], pmode.cuGeom);
+                }
+                break;
+
+            case PRED_Nx2N:
+                slave.checkInter_rd5_6(md.pred[PRED_Nx2N], pmode.cuGeom, SIZE_Nx2N, false);
+                break;
+
+            case PRED_2NxN:
+                slave.checkInter_rd5_6(md.pred[PRED_2NxN], pmode.cuGeom, SIZE_2NxN, false);
+                break;
+
+            case PRED_2NxnU:
+                slave.checkInter_rd5_6(md.pred[PRED_2NxnU], pmode.cuGeom, SIZE_2NxnU, bMergeOnly);
+                break;
+
+            case PRED_2NxnD:
+                slave.checkInter_rd5_6(md.pred[PRED_2NxnD], pmode.cuGeom, SIZE_2NxnD, bMergeOnly);
+                break;
+
+            case PRED_nLx2N:
+                slave.checkInter_rd5_6(md.pred[PRED_nLx2N], pmode.cuGeom, SIZE_nLx2N, bMergeOnly);
+                break;
+
+            case PRED_nRx2N:
+                slave.checkInter_rd5_6(md.pred[PRED_nRx2N], pmode.cuGeom, SIZE_nRx2N, bMergeOnly);
+                break;
+
+            default:
+                X265_CHECK(0, "invalid job ID for parallel mode analysis\n");
+                break;
+            }
+        }
+
+        task = -1;
+        pmode.m_lock.acquire();
+        if (pmode.m_jobTotal > pmode.m_jobAcquired)
+            task = pmode.m_jobAcquired++;
+        pmode.m_lock.release();
+    }
+    while (task >= 0);
 }
 
 void Analysis::compressInterCU_dist(const CUData& parentCTU, const CUGeom& cuGeom)
@@ -536,59 +495,37 @@
         int bTryAmp = m_slice->m_sps->maxAMPDepth > depth && (cuGeom.log2CUSize < 6 || m_param->rdLevel > 4);
         int bTryIntra = m_slice->m_sliceType != B_SLICE || m_param->bIntraInBFrames;
 
+        PMODE pmode(*this, cuGeom);
+
         /* Initialize all prediction CUs based on parentCTU */
-        md.pred[PRED_2Nx2N].cu.initSubCU(parentCTU, cuGeom);
-        md.pred[PRED_BIDIR].cu.initSubCU(parentCTU, cuGeom);
         md.pred[PRED_MERGE].cu.initSubCU(parentCTU, cuGeom);
         md.pred[PRED_SKIP].cu.initSubCU(parentCTU, cuGeom);
+        if (bTryIntra)
+        {
+            md.pred[PRED_INTRA].cu.initSubCU(parentCTU, cuGeom);
+            if (depth == g_maxCUDepth && cuGeom.log2CUSize > m_slice->m_sps->quadtreeTULog2MinSize && m_param->rdLevel >= 5)
+                md.pred[PRED_INTRA_NxN].cu.initSubCU(parentCTU, cuGeom);
+            pmode.modes[pmode.m_jobTotal++] = PRED_INTRA;
+        }
+        md.pred[PRED_2Nx2N].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_2Nx2N;
+        md.pred[PRED_BIDIR].cu.initSubCU(parentCTU, cuGeom);
         if (m_param->bEnableRectInter)
         {
-            md.pred[PRED_2NxN].cu.initSubCU(parentCTU, cuGeom);
-            md.pred[PRED_Nx2N].cu.initSubCU(parentCTU, cuGeom);
+            md.pred[PRED_2NxN].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_2NxN;
+            md.pred[PRED_Nx2N].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_Nx2N;
         }
         if (bTryAmp)
         {
-            md.pred[PRED_2NxnU].cu.initSubCU(parentCTU, cuGeom);
-            md.pred[PRED_2NxnD].cu.initSubCU(parentCTU, cuGeom);
-            md.pred[PRED_nLx2N].cu.initSubCU(parentCTU, cuGeom);
-            md.pred[PRED_nRx2N].cu.initSubCU(parentCTU, cuGeom);
-        }
-        if (bTryIntra)
-        {
-            md.pred[PRED_INTRA].cu.initSubCU(parentCTU, cuGeom);
-            if (cuGeom.log2CUSize == 3 && m_slice->m_sps->quadtreeTULog2MinSize < 3)
-                md.pred[PRED_INTRA_NxN].cu.initSubCU(parentCTU, cuGeom);
+            md.pred[PRED_2NxnU].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_2NxnU;
+            md.pred[PRED_2NxnD].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_2NxnD;
+            md.pred[PRED_nLx2N].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_nLx2N;
+            md.pred[PRED_nRx2N].cu.initSubCU(parentCTU, cuGeom); pmode.modes[pmode.m_jobTotal++] = PRED_nRx2N;
         }
 
-        m_pmodeLock.acquire();
-        m_totalNumJobs = 2 + m_param->bEnableRectInter * 2 + bTryAmp * 4;
-        m_numAcquiredJobs = !bTryIntra;
-        m_numCompletedJobs = m_numAcquiredJobs;
-        m_curGeom = &cuGeom;
-        m_bJobsQueued = true;
-        JobProvider::enqueue();
-        m_pmodeLock.release();
-
-        for (int i = 0; i < m_totalNumJobs - m_numCompletedJobs; i++)
-            m_pool->pokeIdleThread();
+        pmode.tryBondPeers(*m_frame->m_encData->m_jobProvider, pmode.m_jobTotal);
 
         /* participate in processing jobs, until all are distributed */
-        m_pmodeLock.acquire();
-        while (m_totalNumJobs > m_numAcquiredJobs)
-        {
-            int id = m_numAcquiredJobs++;
-            m_pmodeLock.release();
-
-            parallelModeAnalysis(-1, id);
-
-            m_pmodeLock.acquire();
-            if (++m_numCompletedJobs == m_totalNumJobs)
-                m_modeCompletionEvent.trigger();
-        }
-        m_pmodeLock.release();
-
-        JobProvider::dequeue();
-        m_bJobsQueued = false;
+        processPmode(pmode, *this);
 
         /* the master worker thread (this one) does merge analysis. By doing
          * merge after all the other jobs are at least started, we usually avoid
@@ -600,7 +537,7 @@
 
             {
                 ProfileCUScope(parentCTU, pmodeBlockTime, countPModeMasters);
-                m_modeCompletionEvent.wait();
+                pmode.waitForExit();
             }
 
             /* select best inter mode based on sa8d cost */
@@ -681,7 +618,7 @@
             checkMerge2Nx2N_rd5_6(md.pred[PRED_SKIP], md.pred[PRED_MERGE], cuGeom, false);
             {
                 ProfileCUScope(parentCTU, pmodeBlockTime, countPModeMasters);
-                m_modeCompletionEvent.wait();
+                pmode.waitForExit();
             }
 
             checkBestMode(md.pred[PRED_2Nx2N], depth);
diff -r 4e72e207f865 -r bc969aedef71 source/encoder/analysis.h
--- a/source/encoder/analysis.h	Thu Feb 19 09:45:59 2015 -0600
+++ b/source/encoder/analysis.h	Thu Feb 19 09:39:48 2015 -0600
@@ -70,6 +70,25 @@
         CUDataMemPool  cuMemPool;
     };
 
+    class PMODE : public BondedTaskGroup
+    {
+    public:
+
+        Analysis&     master;
+        const CUGeom& cuGeom;
+        int           modes[MAX_PRED_TYPES];
+
+        PMODE(Analysis& m, const CUGeom& g) : master(m), cuGeom(g) {}
+
+        void processTasks(int workerThreadId);
+
+    protected:
+
+        PMODE operator=(const PMODE&);
+    };
+
+    void processPmode(PMODE& pmode, Analysis& slave);
+
     ModeDepth m_modeDepth[NUM_CU_DEPTH];
     bool      m_bTryLossless;
     bool      m_bChromaSa8d;
@@ -83,16 +102,6 @@
 
 protected:
 
-    /* mode analysis distribution */
-    int           m_totalNumJobs;
-    volatile int  m_numAcquiredJobs;
-    volatile int  m_numCompletedJobs;
-    Lock          m_pmodeLock;
-    Event         m_modeCompletionEvent;
-    bool findJob(int threadId);
-    void parallelModeAnalysis(int threadId, int jobId);
-    void parallelME(int threadId, int meId);
-
     /* Analysis data for load/save modes, keeps getting incremented as CTU analysis proceeds and data is consumed or read */
     analysis_intra_data* m_reuseIntraDataCTU;
     analysis_inter_data* m_reuseInterDataCTU;
diff -r 4e72e207f865 -r bc969aedef71 source/encoder/search.cpp
--- a/source/encoder/search.cpp	Thu Feb 19 09:45:59 2015 -0600
+++ b/source/encoder/search.cpp	Thu Feb 19 09:39:48 2015 -0600
@@ -30,6 +30,9 @@
 #include "entropy.h"
 #include "rdcost.h"
 
+#include "analysis.h"  // TLD
+#include "framedata.h"
+
 using namespace x265;
 
 #if _MSC_VER
@@ -42,7 +45,7 @@
 
 ALIGN_VAR_32(const int16_t, Search::zeroShort[MAX_CU_SIZE]) = { 0 };
 
-Search::Search() : JobProvider(NULL)
+Search::Search()
 {
     memset(m_rqt, 0, sizeof(m_rqt));
 
@@ -56,8 +59,6 @@
     m_param = NULL;
     m_slice = NULL;
     m_frame = NULL;
-    m_bJobsQueued = false;
-    m_totalNumME = m_numAcquiredME = m_numCompletedME = 0;
 }
 
 bool Search::initSearch(const x265_param& param, ScalingList& scalingList)
@@ -1860,6 +1861,57 @@
     return outCost;
 }
 
+void Search::PME::processTasks(int workerThreadId)
+{
+    ProfileScopeEvent(pme);
+    master.processPME(*this, master.m_tld[workerThreadId].analysis);
+}
+
+void Search::processPME(PME& pme, Search& slave)
+{
+    /* acquire a motion estimation job, else exit early */
+    int meId;
+    pme.m_lock.acquire();
+    if (pme.m_jobTotal > pme.m_jobAcquired)
+    {
+        meId = pme.m_jobAcquired++;
+        pme.m_lock.release();
+    }
+    else
+    {
+        pme.m_lock.release();
+        return;
+    }
+
+    ProfileCUScope(pme.mode.cu, pmeTime, countPMETasks);
+
+    /* Setup slave Search instance for ME for master's CU */
+    if (&slave != this)
+    {
+        slave.setQP(*m_slice, m_rdCost.m_qp);
+        slave.m_slice = m_slice;
+        slave.m_frame = m_frame;
+        slave.m_me.setSourcePU(*pme.mode.fencYuv, pme.mode.cu.m_cuAddr, pme.cuGeom.encodeIdx, m_puAbsPartIdx, m_puWidth, m_puHeight);
+        slave.prepMotionCompensation(pme.mode.cu, pme.cuGeom, pme.puIdx);
+    }
+
+    /* Perform ME, repeat until no more work is available */
+    do
+    {
+        if (meId < m_slice->m_numRefIdx[0])
+            slave.singleMotionEstimation(*this, pme.mode, pme.cuGeom, pme.puIdx, 0, meId);
+        else
+            slave.singleMotionEstimation(*this, pme.mode, pme.cuGeom, pme.puIdx, 1, meId - m_slice->m_numRefIdx[0]);
+
+        meId = -1;
+        pme.m_lock.acquire();
+        if (pme.m_jobTotal > pme.m_jobAcquired)
+            meId = pme.m_jobAcquired++;
+        pme.m_lock.release();
+    }
+    while (meId >= 0);
+}
+
 /* this function assumes the caller has configured its MotionEstimation engine with the
  * correct source plane and source PU, and has called prepMotionCompensation() to set
  * m_puAbsPartIdx, m_puWidth, and m_puHeight */
@@ -1943,7 +1995,8 @@
     const int* numRefIdx = slice->m_numRefIdx;
     uint32_t lastMode = 0;
     int      totalmebits = 0;
-    bool     bDistributed = m_param->bDistributeMotionEstimation && (numRefIdx[0] + numRefIdx[1]) > 2;
+    int      numME = numRefIdx[0] + numRefIdx[1];
+    bool     bTryDistributed = m_param->bDistributeMotionEstimation && numME > 2;
     MV       mvzero(0, 0);
     Yuv&     tmpPredYuv = m_rqt[cuGeom.depth].tmpPredYuv;
 
@@ -1997,6 +2050,7 @@
         bestME[1].cost = MAX_UINT;
 
         getBlkBits((PartSize)cu.m_partSize[0], slice->isInterP(), puIdx, lastMode, m_listSelBits);
+        bool bDoUnidir = true;
 
         /* Uni-directional prediction */
         if (m_param->analysisMode == X265_ANALYSIS_LOAD)
@@ -2058,62 +2112,30 @@
                     bestME[l].bits = bits;
                 }
             }
+            bDoUnidir = false;
         }
-        else if (bDistributed)
+        else if (bTryDistributed)
         {
-            m_meLock.acquire();
-            m_curInterMode = &interMode;
-            m_curGeom = &cuGeom;
-            m_curPart = puIdx;
-            m_totalNumME = 0;
-            m_numAcquiredME = 1;
-            m_numCompletedME = 0;
-            m_totalNumME = numRefIdx[0] + numRefIdx[1];
-            m_meLock.release();
-
-            if (!m_bJobsQueued)
-                JobProvider::enqueue();
-
-            for (int i = 1; i < m_totalNumME; i++)
-                m_pool->pokeIdleThread();
-
-            do
+            PME pme(*this, interMode, cuGeom, puIdx);
+            pme.m_jobTotal = numME;
+            pme.m_jobAcquired = 1; /* reserve L0-0 */
+
+            if (pme.tryBondPeers(*m_frame->m_encData->m_jobProvider, numME - 1))
             {
-                m_meLock.acquire();
-                if (m_totalNumME > m_numAcquiredME)
-                {
-                    int id = m_numAcquiredME++;
-                    m_meLock.release();
-
-                    if (id < numRefIdx[0])
-                        singleMotionEstimation(*this, interMode, cuGeom, puIdx, 0, id);
-                    else
-                        singleMotionEstimation(*this, interMode, cuGeom, puIdx, 1, id - numRefIdx[0]);
-
-                    m_meLock.acquire();
-                    m_numCompletedME++;
-                    m_meLock.release();
-                }
-                else
-                    m_meLock.release();
+                processPME(pme, *this);
+
+                singleMotionEstimation(*this, interMode, cuGeom, puIdx, 0, 0); /* L0-0 */
+
+                bDoUnidir = false;
+
+                ProfileCUScopeNamed(pmeWaitScope, interMode.cu, pmeBlockTime, countPMEMasters);
+                pme.waitForExit();
             }
-            while (m_totalNumME > m_numAcquiredME);
-
-            if (!m_bJobsQueued)
-                JobProvider::dequeue();
-
-            /* we saved L0-0 for ourselves */
-            singleMotionEstimation(*this, interMode, cuGeom, puIdx, 0, 0);
-
-            m_meLock.acquire();
-            if (++m_numCompletedME == m_totalNumME)
-                m_meCompletionEvent.trigger();
-            m_meLock.release();
-
-            ProfileCUScopeNamed(pmeWaitScope, interMode.cu, pmeBlockTime, countPMEMasters);
-            m_meCompletionEvent.wait();
+
+            /* if no peer threads were bonded, fall back to doing unidirectional
+             * searches ourselves without overhead of singleMotionEstimation() */
         }
-        else
+        if (bDoUnidir)
         {
             for (int l = 0; l < numPredDir; l++)
             {
diff -r 4e72e207f865 -r bc969aedef71 source/encoder/search.h
--- a/source/encoder/search.h	Thu Feb 19 09:45:59 2015 -0600
+++ b/source/encoder/search.h	Thu Feb 19 09:39:48 2015 -0600
@@ -210,7 +210,7 @@
     return idx + (idx < numIdx - 1);
 }
 
-class Search : public JobProvider, public Predict
+class Search : public Predict
 {
 public:
 
@@ -220,6 +220,7 @@
     Quant           m_quant;
     RDCost          m_rdCost;
     const x265_param* m_param;
+    ThreadPool*     m_pool;
     Frame*          m_frame;
     const Slice*    m_slice;
 
@@ -271,21 +272,34 @@
     // pick be chroma mode from available using just sa8d costs
     void     getBestIntraModeChroma(Mode& intraMode, const CUGeom& cuGeom);
 
+    class PME : public BondedTaskGroup
+    {
+    public:
+
+        Search&       master;
+        Mode&         mode;
+        const CUGeom& cuGeom;
+        int           puIdx;
+
+        PME(Search& s, Mode& m, const CUGeom& g, int p) : master(s), mode(m), cuGeom(g), puIdx(p) {}
+
+        void processTasks(int workerThreadId);
+
+    protected:
+
+        PME operator=(const PME&);
+    };
+
+    void     processPME(PME& pme, Search& slave);
+    void     singleMotionEstimation(Search& master, Mode& interMode, const CUGeom& cuGeom, int part, int list, int ref);
+
 protected:
 
     /* motion estimation distribution */
     ThreadLocalData* m_tld;
-    Mode*         m_curInterMode;
-    const CUGeom* m_curGeom;
-    int           m_curPart;
+
     uint32_t      m_listSelBits[3];
-    int           m_totalNumME;
-    volatile int  m_numAcquiredME;
-    volatile int  m_numCompletedME;
-    Event         m_meCompletionEvent;
     Lock          m_meLock;
-    bool          m_bJobsQueued;
-    void     singleMotionEstimation(Search& master, Mode& interMode, const CUGeom& cuGeom, int part, int list, int ref);
 
     void     saveResidualQTData(CUData& cu, ShortYuv& resiYuv, uint32_t absPartIdx, uint32_t tuDepth);
 


More information about the x265-devel mailing list