[x265] [PATCH 1 of 3] NUMA based thread pools
Steve Borho
steve at borho.org
Thu Feb 19 20:40:42 CET 2015
# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1424360759 21600
# Thu Feb 19 09:45:59 2015 -0600
# Node ID 4e72e207f86574e43dc9c63bc4ddb6fb61cfbb7e
# Parent d34f9d23b370a8bb2120df81bca8c43cc1a0a226
NUMA based thread pools
On systems with multiple NUMA nodes (typically multi-socket workstations or
servers) x265 will now create one thread pool per NUMA node and distribute
frame encoders to the pools evenly. Each frame encoder will use only one
thread pool and its worker threads. This prevents threads from different NUMA
nodes working together on the same frame.
On UNIX, you must link with libnuma in order for x265 to be NUMA aware.
On systems with a single socket or UNIX systems without libnuma this change
should be un-noticeable except --threads N is now --pool N
Since JobProviders are assigned to pools statically, each pool knows up front
which JobProviders it is servicing and can be more intelligent about which
provider needs the most help (I over P, P over B, B over b). The enqueue/dequeue
functions are no longer necessary.
FrameEncoders are now allocating the ThreadLocalData array for their thread
pool, allocating it after setting their thread affinity to their target NUMA
node so that memory should be associated to their socket. More work needs to be
done to mirror recon pictures between nodes so motion estimation does not have
to use remote external memory.
This commit introduces knowledge in the thread pool about BondedTaskGroups but
they are only implemented in the next commit. This commit will not compile by
itself.
diff -r d34f9d23b370 -r 4e72e207f865 doc/reST/cli.rst
--- a/doc/reST/cli.rst Thu Feb 19 13:06:32 2015 -0600
+++ b/doc/reST/cli.rst Thu Feb 19 09:45:59 2015 -0600
@@ -173,19 +173,52 @@
**Values:** any value between 8 and 16. Default is 0, auto-detect
-.. option:: --threads <integer>
+.. option:: --pools <string>, --numa-pools <string>
+
+ Comma seperated list of threads per NUMA node. If "none", then no worker
+ pools are created and only frame parallelism is possible. If NULL or ""
+ (default) x265 will use all available threads on each NUMA node::
+
+ '+' is a special value indicating all cores detected on the node
+ '*' is a special value indicating all cores detected on the node and all remaining nodes
+ '-' is a special value indicating no cores on the node, same as '0'
+
+ example strings for a 4-node system::
+ "" - default, unspecified, all numa nodes are used for thread pools
+ "*" - same as default
+ "none" - no thread pools are created, only frame parallelism possible
+ "-" - same as "none"
+ "10" - allocate one pool, using up to 10 cores on node 0
+ "-,+" - allocate one pool, using all cores on node 1
+ "+,-,+" - allocate two pools, using all cores on nodes 0 and 2
+ "+,-,+,-" - allocate two pools, using all cores on nodes 0 and 2
+ "-,*" - allocate three pools, using all cores on nodes 1, 2 and 3
+ "8,8,8,8" - allocate four pools with up to 8 threads in each pool
+
+ The total number of threads will be determined by the number of threads
+ assigned to all nodes. The worker threads will each be given affinity for
+ their node, they will not be allowed to migrate between nodes, but they
+ will be allowed to move between CPU cores within their node.
+
+ If the three pool features: :option:`--wpp` :option:`--pmode` and
+ :option:`--pme` are all disabled, then :option:`--pools` is ignored
+ and no thread pools are created.
+
+ If "none" is specified, then all three of the thread pool features are
+ implicitly disabled.
+
+ Multiple thread pools will be allocated for any NUMA node with more than
+ 64 logical CPU cores. But any given thread pool will always use at most
+ one NUMA node.
+
+ Frame encoders are distributed between the available thread pools, and
+ the encoder will never generate more thread pools than frameNumThreads
Number of threads to allocate for the worker thread pool This pool
is used for WPP and for distributed analysis and motion search:
- :option:`--wpp` :option:`--pmode` and :option:`--pme` respectively.
- If :option:`--threads` 1 is specified, then no thread pool is
- created. When no thread pool is created, all the thread pool
- features are implicitly disabled. If all the pool features are
- disabled by the user, then the pool is implicitly disabled.
-
- Default 0, one thread is allocated per detected hardware thread
- (logical CPU cores)
+ Default "", one thread is allocated per detected hardware thread
+ (logical CPU cores) and one thread pool per NUMA node.
.. option:: --wpp, --no-wpp
diff -r d34f9d23b370 -r 4e72e207f865 doc/reST/threading.rst
--- a/doc/reST/threading.rst Thu Feb 19 13:06:32 2015 -0600
+++ b/doc/reST/threading.rst Thu Feb 19 09:45:59 2015 -0600
@@ -2,41 +2,34 @@
Threading
*********
-Thread Pool
-===========
+Thread Pools
+============
-x265 creates a pool of worker threads and shares this thread pool
-with all encoders within the same process (it is process global, aka a
-singleton). The number of threads within the thread pool is determined
-by the encoder which first allocates the pool, which by definition is
-the first encoder created within each process.
+x265 creates one or more thread pools per encoder, one pool per NUMA
+node (typically a CPU socket). :option:`--pools` specifies the number of
+pools and the number of threads per pool the encoder will allocate. By
+default x265 allocates one thread per (hyperthreaded) CPU core on each
+NUMA node.
-:option:`--threads` specifies the number of threads the encoder will
-try to allocate for its thread pool. If the thread pool was already
-allocated this parameter is ignored. By default x265 allocates one
-thread per (hyperthreaded) CPU core in your system.
+If you are running multiple encoders on a system with multiple NUMA
+nodes, it is recommended to isolate each of them to a single node in
+order to avoid the NUMA overhead of remote memory access.
-Work distribution is job based. Idle worker threads ask their parent
-pool object for jobs to perform. When no jobs are available, idle
-worker threads block and consume no CPU cycles.
+Work distribution is job based. Idle worker threads scan the job
+providers assigned to their thread pool for jobs to perform. When no
+jobs are available, the idle worker threads block and consume no CPU
+cycles.
Objects which desire to distribute work to worker threads are known as
-job providers (and they derive from the JobProvider class). When job
-providers have work they enqueue themselves into the pool's provider
-list (and dequeue themselves when they no longer have work). The thread
+job providers (and they derive from the JobProvider class). The thread
pool has a method to **poke** awake a blocked idle thread, and job
providers are recommended to call this method when they make new jobs
available.
Worker jobs are not allowed to block except when abosultely necessary
-for data locking. If a job becomes blocked, the worker thread is
-expected to drop that job and go back to the pool and find more work.
-
-.. note::
-
- x265_cleanup() frees the process-global thread pool, allowing
- it to be reallocated if necessary, but only if no encoders are
- allocated at the time it is called.
+for data locking. If a job becomes blocked, the work function is
+expected to drop that job so the worker thread may go back to the pool
+and find more work.
Wavefront Parallel Processing
=============================
@@ -138,8 +131,8 @@
becomes blocked by a reference frame row being available, that frame's
wave-front becomes completely stalled and when the row becomes available
again it can take quite some time for the wave to be restarted, if it
-ever does. This makes WPP many times less effective when frame
-parallelism is in use.
+ever does. This makes WPP less effective when frame parallelism is in
+use.
:option:`--merange` can have a negative impact on frame parallelism. If
the range is too large, more rows of CTU lag must be added to ensure
diff -r d34f9d23b370 -r 4e72e207f865 source/common/framedata.h
--- a/source/common/framedata.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/framedata.h Thu Feb 19 09:45:59 2015 -0600
@@ -32,6 +32,7 @@
// private namespace
class PicYuv;
+class JobProvider;
/* Per-frame data that is used during encodes and referenced while the picture
* is available for reference. A FrameData instance is attached to a Frame as it
@@ -52,6 +53,7 @@
PicYuv* m_reconPic;
bool m_bHasReferences; /* used during DPB/RPS updates */
int m_frameEncoderID; /* the ID of the FrameEncoder encoding this frame */
+ JobProvider* m_jobProvider;
CUDataMemPool m_cuMemPool;
CUData* m_picCTU;
diff -r d34f9d23b370 -r 4e72e207f865 source/common/param.cpp
--- a/source/common/param.cpp Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/param.cpp Thu Feb 19 09:45:59 2015 -0600
@@ -100,7 +100,6 @@
/* Applying default values to all elements in the param structure */
param->cpuid = x265::cpu_detect();
param->bEnableWavefront = 1;
- param->poolNumThreads = 0;
param->frameNumThreads = 0;
param->logLevel = X265_LOG_INFO;
@@ -545,7 +544,6 @@
}
}
}
- OPT("threads") p->poolNumThreads = atoi(value);
OPT("frame-threads") p->frameNumThreads = atoi(value);
OPT("pmode") p->bDistributeModeAnalysis = atobool(value);
OPT("pme") p->bDistributeMotionEstimation = atobool(value);
@@ -821,6 +819,7 @@
OPT("stats") p->rc.statFileName = strdup(value);
OPT("csv") p->csvfn = strdup(value);
OPT("scaling-list") p->scalingLists = strdup(value);
+ OPT2("pools", "numa-pools") p->numaPools = strdup(value);
OPT("lambda-file") p->rc.lambdaFileName = strdup(value);
OPT("analysis-file") p->analysisFileName = strdup(value);
else
diff -r d34f9d23b370 -r 4e72e207f865 source/common/threadpool.cpp
--- a/source/common/threadpool.cpp Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/threadpool.cpp Thu Feb 19 09:45:59 2015 -0600
@@ -27,115 +27,65 @@
#include <new>
+#if X86_64
+
+#ifdef __GNUC__
+
+#define SLEEPBITMAP_CTZ(id, x) id = (unsigned long)__builtin_ctzll(x)
+#define SLEEPBITMAP_OR(ptr, mask) __sync_fetch_and_or(ptr, mask)
+#define SLEEPBITMAP_AND(ptr, mask) __sync_fetch_and_and(ptr, mask)
+
+#elif defined(_MSC_VER)
+
+#define SLEEPBITMAP_CTZ(id, x) _BitScanForward64(&id, x)
+#define SLEEPBITMAP_OR(ptr, mask) InterlockedOr64((volatile LONG64*)ptr, (LONG)mask)
+#define SLEEPBITMAP_AND(ptr, mask) InterlockedAnd64((volatile LONG64*)ptr, (LONG)mask)
+
+#endif // ifdef __GNUC__
+
+#else
+
+/* use 32-bit primitives defined in threading.h */
+#define SLEEPBITMAP_CTZ CTZ
+#define SLEEPBITMAP_OR ATOMIC_OR
+#define SLEEPBITMAP_AND ATOMIC_AND
+
+#endif
+
#if MACOS
#include <sys/param.h>
#include <sys/sysctl.h>
#endif
+#if HAVE_LIBNUMA
+#include <numa.h>
+#endif
namespace x265 {
// x265 private namespace
-class ThreadPoolImpl;
-
-class PoolThread : public Thread
+class WorkerThread : public Thread
{
private:
- ThreadPoolImpl &m_pool;
+ ThreadPool& m_pool;
+ int m_id;
+ Event m_wakeEvent;
- PoolThread& operator =(const PoolThread&);
-
- int m_id;
-
- bool m_dirty;
-
- bool m_exited;
-
- Event m_wakeEvent;
+ WorkerThread& operator =(const WorkerThread&);
public:
- PoolThread(ThreadPoolImpl& pool, int id)
- : m_pool(pool)
- , m_id(id)
- , m_dirty(false)
- , m_exited(false)
- {
- }
+ JobProvider* m_curJobProvider;
+ BondedTaskGroup* m_bondMaster;
- bool isDirty() const { return m_dirty; }
-
- void markDirty() { m_dirty = true; }
-
- bool isExited() const { return m_exited; }
-
- void poke() { m_wakeEvent.trigger(); }
-
- virtual ~PoolThread() {}
+ WorkerThread(ThreadPool& pool, int id) : m_pool(pool), m_id(id) {}
+ virtual ~WorkerThread() {}
void threadMain();
+ void awaken() { m_wakeEvent.trigger(); }
};
-class ThreadPoolImpl : public ThreadPool
-{
-private:
-
- bool m_ok;
- int m_referenceCount;
- int m_numThreads;
- int m_numSleepMapWords;
- PoolThread *m_threads;
- volatile uint32_t *m_sleepMap;
-
- /* Lock for write access to the provider lists. Threads are
- * always allowed to read m_firstProvider and follow the
- * linked list. Providers must zero their m_nextProvider
- * pointers before removing themselves from this list */
- Lock m_writeLock;
-
-public:
-
- static ThreadPoolImpl *s_instance;
- static Lock s_createLock;
-
- JobProvider *m_firstProvider;
- JobProvider *m_lastProvider;
-
-public:
-
- ThreadPoolImpl(int numthreads);
-
- virtual ~ThreadPoolImpl();
-
- ThreadPoolImpl *AddReference()
- {
- m_referenceCount++;
-
- return this;
- }
-
- void markThreadAsleep(int id);
-
- void waitForAllIdle();
-
- int getThreadCount() const { return m_numThreads; }
-
- bool IsValid() const { return m_ok; }
-
- void release();
-
- void Stop();
-
- void enqueueJobProvider(JobProvider &);
-
- void dequeueJobProvider(JobProvider &);
-
- void FlushProviderList();
-
- void pokeIdleThread();
-};
-
-void PoolThread::threadMain()
+void WorkerThread::threadMain()
{
THREAD_NAME("Worker", m_id);
@@ -145,286 +95,350 @@
__attribute__((unused)) int val = nice(10);
#endif
- while (m_pool.IsValid())
+ m_pool.setCurrentThreadAffinity();
+
+ uint32_t idBit = 1 << m_id;
+ m_curJobProvider = m_pool.m_jpTable[0];
+ m_bondMaster = NULL;
+
+ SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
+ SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
+ m_wakeEvent.wait();
+
+ while (m_pool.m_isActive)
{
- /* Walk list of job providers, looking for work */
- JobProvider *cur = m_pool.m_firstProvider;
- while (cur)
+ if (m_bondMaster)
{
- // FindJob() may perform actual work and return true. If
- // it does we restart the job search
- if (cur->findJob(m_id) == true)
- break;
-
- cur = cur->m_nextProvider;
+ m_bondMaster->processTasks(m_id);
+ m_bondMaster->m_exitedPeerCount.incr();
+ m_bondMaster = NULL;
}
- // this thread has reached the end of the provider list
- m_dirty = false;
+ do
+ {
+ /* do pending work for current job provider */
+ m_curJobProvider->findJob(m_id);
- if (cur == NULL)
+ /* if the current job provider still wants help, only switch to a
+ * higher priority provider (lower slice type). Else take the first
+ * available job provider with the highest priority */
+ int curPriority = (m_curJobProvider->m_helpWanted) ? m_curJobProvider->m_sliceType :
+ INVALID_SLICE_PRIORITY + 1;
+ int nextProvider = -1;
+ for (int i = 0; i < m_pool.m_numProviders; i++)
+ {
+ if (m_pool.m_jpTable[i]->m_helpWanted &&
+ m_pool.m_jpTable[i]->m_sliceType < curPriority)
+ {
+ nextProvider = i;
+ curPriority = m_pool.m_jpTable[i]->m_sliceType;
+ }
+ }
+ if (nextProvider != -1 && m_curJobProvider != m_pool.m_jpTable[nextProvider])
+ {
+ SLEEPBITMAP_AND(&m_curJobProvider->m_ownerBitmap, ~idBit);
+ m_curJobProvider = m_pool.m_jpTable[nextProvider];
+ SLEEPBITMAP_OR(&m_curJobProvider->m_ownerBitmap, idBit);
+ }
+ }
+ while (m_curJobProvider->m_helpWanted);
+
+ /* While the worker sleeps, a job-provider or bond-group may acquire this
+ * worker's sleep bitmap bit. Once acquired, that thread may modify
+ * m_bondMaster or m_curJobProvider, then waken the thread */
+ SLEEPBITMAP_OR(&m_pool.m_sleepBitmap, idBit);
+ m_wakeEvent.wait();
+ }
+}
+
+void JobProvider::tryWakeOne()
+{
+ int id = m_pool->tryAcquireSleepingThread(m_ownerBitmap, ALL_POOL_THREADS);
+ if (id < 0)
+ {
+ m_helpWanted = true;
+ return;
+ }
+
+ WorkerThread& worker = m_pool->m_workers[id];
+ if (worker.m_curJobProvider != this) /* poaching */
+ {
+ uint32_t bit = 1 << id;
+ SLEEPBITMAP_AND(&worker.m_curJobProvider->m_ownerBitmap, ~bit);
+ worker.m_curJobProvider = this;
+ SLEEPBITMAP_OR(&worker.m_curJobProvider->m_ownerBitmap, bit);
+ }
+ worker.awaken();
+}
+
+int ThreadPool::tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap)
+{
+ unsigned long id;
+
+ sleepbitmap_t masked = m_sleepBitmap & firstTryBitmap;
+ while (masked)
+ {
+ SLEEPBITMAP_CTZ(id, masked);
+
+ uint32_t bit = 1 << id;
+ if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
+ return (int)id;
+
+ masked = m_sleepBitmap & firstTryBitmap;
+ }
+
+ masked = m_sleepBitmap & secondTryBitmap;
+ while (masked)
+ {
+ SLEEPBITMAP_CTZ(id, masked);
+
+ uint32_t bit = 1 << id;
+ if (SLEEPBITMAP_AND(&m_sleepBitmap, ~bit) & bit)
+ return (int)id;
+
+ masked = m_sleepBitmap & secondTryBitmap;
+ }
+
+ return -1;
+}
+
+int ThreadPool::tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master)
+{
+ int bondCount = 0;
+ do
+ {
+ int id = tryAcquireSleepingThread(peerBitmap, 0);
+ if (id < 0)
+ return bondCount;
+
+ m_workers[id].m_bondMaster = &master;
+ m_workers[id].awaken();
+ bondCount++;
+ }
+ while (bondCount < maxPeers);
+
+ return bondCount;
+}
+
+ThreadPool* ThreadPool::allocThreadPools(x265_param* p, int& numPools)
+{
+ enum { MAX_NODE_NUM = 127 };
+ int cpusPerNode[MAX_NODE_NUM + 1];
+
+ memset(cpusPerNode, 0, sizeof(cpusPerNode));
+ int numNumaNodes = X265_MIN(getNumaNodeCount(), MAX_NODE_NUM);
+ int cpuCount = getCpuCount();
+ bool bNumaSupport = false;
+
+#if _WIN32_WINNT >= 0x0601
+ bNumaSupport = true;
+#elif HAVE_LIBNUMA
+ bNumaSupport = numa_available() >= 0;
+#endif
+
+
+ for (int i = 0; i < cpuCount; i++)
+ {
+#if _WIN32_WINNT >= 0x0601
+ UCHAR node;
+ if (GetNumaProcessorNode((UCHAR)i, &node))
+ cpusPerNode[X265_MIN(node, MAX_NODE_NUM)]++;
+ else
+#elif HAVE_LIBNUMA
+ if (bNumaSupport >= 0)
+ cpusPerNode[X265_MIN(numa_node_of_cpu(i), MAX_NODE_NUM)]++;
+ else
+#endif
+ cpusPerNode[0]++;
+ }
+
+ if (bNumaSupport && p->logLevel >= X265_LOG_DEBUG)
+ for (int i = 0; i < numNumaNodes; i++)
+ x265_log(p, X265_LOG_DEBUG, "detected NUMA node %d with %d logical cores\n", i, cpusPerNode[i]);
+
+ /* limit nodes based on param->numaPools */
+ if (p->numaPools && *p->numaPools)
+ {
+ char *nodeStr = p->numaPools;
+ for (int i = 0; i < numNumaNodes; i++)
{
- m_pool.markThreadAsleep(m_id);
- m_wakeEvent.wait();
+ if (!*nodeStr)
+ {
+ cpusPerNode[i] = 0;
+ continue;
+ }
+ else if (*nodeStr == '-')
+ cpusPerNode[i] = 0;
+ else if (*nodeStr == '*')
+ break;
+ else if (*nodeStr == '+')
+ ;
+ else
+ {
+ int count = atoi(nodeStr);
+ cpusPerNode[i] = X265_MIN(count, cpusPerNode[i]);
+ }
+
+ /* consume current node string, comma, and white-space */
+ while (*nodeStr && *nodeStr != ',')
+ ++nodeStr;
+ if (*nodeStr == ',' || *nodeStr == ' ')
+ ++nodeStr;
}
}
- m_exited = true;
+ numPools = 0;
+ for (int i = 0; i < numNumaNodes; i++)
+ {
+ if (bNumaSupport)
+ x265_log(p, X265_LOG_DEBUG, "NUMA node %d may use %d logical cores\n", i, cpusPerNode[i]);
+ if (cpusPerNode[i])
+ numPools += (cpusPerNode[i] + MAX_POOL_THREADS - 1) / MAX_POOL_THREADS;
+ }
+
+ if (!numPools)
+ return NULL;
+
+ if (numPools > p->frameNumThreads)
+ {
+ x265_log(p, X265_LOG_DEBUG, "Reducing number of thread pools for frame thread count\n");
+ numPools = X265_MAX(p->frameNumThreads / 2, 1);
+ }
+
+ ThreadPool *pools = new ThreadPool[numPools];
+ if (pools)
+ {
+ int maxProviders = (p->frameNumThreads + 1 + numPools - 1) / numPools; /* +1 is Lookahead */
+ int node = 0;
+ for (int i = 0; i < numPools; i++)
+ {
+ while (!cpusPerNode[node])
+ node++;
+ int cores = X265_MIN(MAX_POOL_THREADS, cpusPerNode[node]);
+ if (!pools[i].create(cores, maxProviders, node))
+ {
+ X265_FREE(pools);
+ numPools = 0;
+ return NULL;
+ }
+ if (bNumaSupport)
+ x265_log(p, X265_LOG_INFO, "Thread pool %d using %d threads on NUMA node %d\n", i, cores, node);
+ else
+ x265_log(p, X265_LOG_INFO, "Thread pool created using %d threads\n", cores);
+ cpusPerNode[node] -= cores;
+ }
+ }
+ else
+ numPools = 0;
+ return pools;
}
-void ThreadPoolImpl::markThreadAsleep(int id)
+ThreadPool::ThreadPool()
{
- int word = id >> 5;
- uint32_t bit = 1 << (id & 31);
-
- ATOMIC_OR(&m_sleepMap[word], bit);
+ memset(this, 0, sizeof(*this));
}
-void ThreadPoolImpl::pokeIdleThread()
+bool ThreadPool::create(int numThreads, int maxProviders, int node)
{
- /* Find a bit in the sleeping thread bitmap and poke it awake, do
- * not give up until a thread is awakened or all of them are awake */
- for (int i = 0; i < m_numSleepMapWords; i++)
+ X265_CHECK(numThreads <= MAX_POOL_THREADS, "a single thread pool cannot have more than MAX_POOL_THREADS threads\n");
+
+ m_numaNode = node;
+ m_numWorkers = numThreads;
+
+ m_workers = X265_MALLOC(WorkerThread, numThreads);
+ /* placement new initialization */
+ if (m_workers)
+ for (int i = 0; i < numThreads; i++)
+ new (m_workers + i)WorkerThread(*this, i);
+
+ m_jpTable = X265_MALLOC(JobProvider*, maxProviders);
+ m_numProviders = 0;
+
+ return m_workers && m_jpTable;
+}
+
+bool ThreadPool::start()
+{
+ m_isActive = true;
+ for (int i = 0; i < m_numWorkers; i++)
{
- uint32_t oldval = m_sleepMap[i];
- while (oldval)
+ if (!m_workers[i].start())
{
- unsigned long id;
- CTZ(id, oldval);
-
- uint32_t bit = 1 << id;
- if (ATOMIC_AND(&m_sleepMap[i], ~bit) & bit)
- {
- m_threads[i * 32 + id].poke();
- return;
- }
-
- oldval = m_sleepMap[i];
+ m_isActive = false;
+ return false;
}
}
+ return true;
}
-ThreadPoolImpl *ThreadPoolImpl::s_instance;
-Lock ThreadPoolImpl::s_createLock;
+ThreadPool::~ThreadPool()
+{
+ if (m_workers)
+ {
+ m_isActive = false;
+ for (int i = 0; i < m_numWorkers; i++)
+ {
+ m_workers[i].awaken();
+ m_workers[i].stop();
+ m_workers[i].~WorkerThread();
+ }
+
+ X265_FREE(m_workers);
+ }
+
+ X265_FREE(m_jpTable);
+}
+
+void ThreadPool::setCurrentThreadAffinity()
+{
+ setThreadNodeAffinity(m_numaNode);
+}
/* static */
-ThreadPool *ThreadPool::allocThreadPool(int numthreads)
+void ThreadPool::setThreadNodeAffinity(int numaNode)
{
- if (ThreadPoolImpl::s_instance)
- return ThreadPoolImpl::s_instance->AddReference();
-
- /* acquire the lock to create the instance */
- ThreadPoolImpl::s_createLock.acquire();
-
- if (ThreadPoolImpl::s_instance)
- /* pool was allocated while we waited for the lock */
- ThreadPoolImpl::s_instance->AddReference();
- else
- ThreadPoolImpl::s_instance = new ThreadPoolImpl(numthreads);
- ThreadPoolImpl::s_createLock.release();
-
- return ThreadPoolImpl::s_instance;
+#if _WIN32_WINNT >= 0x0601
+ GROUP_AFFINITY groupAffinity;
+ if (GetNumaNodeProcessorMaskEx((USHORT)numaNode, &groupAffinity))
+ {
+ if (SetThreadAffinityMask(GetCurrentThread(), (DWORD_PTR)groupAffinity.Mask))
+ return;
+ }
+ x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity to NUMA node %d\n", numaNode);
+#elif HAVE_LIBNUMA
+ if (numa_available() >= 0)
+ {
+ numa_run_on_node(numaNode);
+ numa_set_preferred(numaNode);
+ numa_set_localalloc();
+ return;
+ }
+ x265_log(NULL, X265_LOG_ERROR, "unable to set thread affinity to NUMA node %d\n", numaNode);
+#else
+ (void)numaNode;
+#endif
}
-ThreadPool *ThreadPool::getThreadPool()
+/* static */
+int ThreadPool::getNumaNodeCount()
{
- X265_CHECK(ThreadPoolImpl::s_instance, "getThreadPool() called prior to allocThreadPool()\n");
- return ThreadPoolImpl::s_instance;
+#if _WIN32_WINNT >= 0x0601
+ ULONG num = 1;
+ if (GetNumaHighestNodeNumber(&num))
+ num++;
+ return (int)num;
+#elif HAVE_LIBNUMA
+ if (numa_available() >= 0)
+ return numa_max_node() + 1;
+ else
+ return 1;
+#else
+ return 1;
+#endif
}
-void ThreadPoolImpl::release()
-{
- if (--m_referenceCount == 0)
- {
- X265_CHECK(this == ThreadPoolImpl::s_instance, "multiple thread pool instances detected\n");
- ThreadPoolImpl::s_instance = NULL;
- this->Stop();
- delete this;
- }
-}
-
-ThreadPoolImpl::ThreadPoolImpl(int numThreads)
- : m_ok(false)
- , m_referenceCount(1)
- , m_firstProvider(NULL)
- , m_lastProvider(NULL)
-{
- m_numSleepMapWords = (numThreads + 31) >> 5;
- m_sleepMap = X265_MALLOC(uint32_t, m_numSleepMapWords);
-
- char *buffer = (char*)X265_MALLOC(PoolThread, numThreads);
- m_threads = reinterpret_cast<PoolThread*>(buffer);
- m_numThreads = numThreads;
-
- if (m_threads && m_sleepMap)
- {
- for (int i = 0; i < m_numSleepMapWords; i++)
- m_sleepMap[i] = 0;
-
- m_ok = true;
- int i;
- for (i = 0; i < numThreads; i++)
- {
- new (buffer)PoolThread(*this, i);
- buffer += sizeof(PoolThread);
- if (!m_threads[i].start())
- {
- m_ok = false;
- break;
- }
- }
-
- if (m_ok)
- waitForAllIdle();
- else
- {
- // stop threads that did start up
- for (int j = 0; j < i; j++)
- {
- m_threads[j].poke();
- m_threads[j].stop();
- }
- }
- }
-}
-
-void ThreadPoolImpl::waitForAllIdle()
-{
- if (!m_ok)
- return;
-
- int id = 0;
- do
- {
- int word = id >> 5;
- uint32_t bit = 1 << (id & 31);
- if (m_sleepMap[word] & bit)
- id++;
- else
- {
- GIVE_UP_TIME();
- }
- }
- while (id < m_numThreads);
-}
-
-void ThreadPoolImpl::Stop()
-{
- if (m_ok)
- {
- waitForAllIdle();
-
- // set invalid flag, then wake them up so they exit their main func
- m_ok = false;
- for (int i = 0; i < m_numThreads; i++)
- {
- m_threads[i].poke();
- m_threads[i].stop();
- }
- }
-}
-
-ThreadPoolImpl::~ThreadPoolImpl()
-{
- X265_FREE((void*)m_sleepMap);
-
- if (m_threads)
- {
- // cleanup thread handles
- for (int i = 0; i < m_numThreads; i++)
- m_threads[i].~PoolThread();
-
- X265_FREE(reinterpret_cast<char*>(m_threads));
- }
-}
-
-void ThreadPoolImpl::enqueueJobProvider(JobProvider &p)
-{
- // only one list writer at a time
- ScopedLock l(m_writeLock);
-
- p.m_nextProvider = NULL;
- p.m_prevProvider = m_lastProvider;
- m_lastProvider = &p;
-
- if (p.m_prevProvider)
- p.m_prevProvider->m_nextProvider = &p;
- else
- m_firstProvider = &p;
-}
-
-void ThreadPoolImpl::dequeueJobProvider(JobProvider &p)
-{
- // only one list writer at a time
- ScopedLock l(m_writeLock);
-
- // update pool entry pointers first
- if (m_firstProvider == &p)
- m_firstProvider = p.m_nextProvider;
-
- if (m_lastProvider == &p)
- m_lastProvider = p.m_prevProvider;
-
- // extract self from doubly linked lists
- if (p.m_nextProvider)
- p.m_nextProvider->m_prevProvider = p.m_prevProvider;
-
- if (p.m_prevProvider)
- p.m_prevProvider->m_nextProvider = p.m_nextProvider;
-
- p.m_nextProvider = NULL;
- p.m_prevProvider = NULL;
-}
-
-/* Ensure all threads have made a full pass through the provider list, ensuring
- * dequeued providers are safe for deletion. */
-void ThreadPoolImpl::FlushProviderList()
-{
- for (int i = 0; i < m_numThreads; i++)
- {
- m_threads[i].markDirty();
- m_threads[i].poke();
- }
-
- int i;
- do
- {
- for (i = 0; i < m_numThreads; i++)
- {
- if (m_threads[i].isDirty())
- {
- GIVE_UP_TIME();
- break;
- }
- }
- }
- while (i < m_numThreads);
-}
-
-void JobProvider::flush()
-{
- if (m_nextProvider || m_prevProvider)
- dequeue();
- dynamic_cast<ThreadPoolImpl*>(m_pool)->FlushProviderList();
-}
-
-void JobProvider::enqueue()
-{
- // Add this provider to the end of the thread pool's job provider list
- X265_CHECK(!m_nextProvider && !m_prevProvider && m_pool, "job provider was already queued\n");
- m_pool->enqueueJobProvider(*this);
- m_pool->pokeIdleThread();
-}
-
-void JobProvider::dequeue()
-{
- // Remove this provider from the thread pool's job provider list
- m_pool->dequeueJobProvider(*this);
- // Ensure no jobs were missed while the provider was being removed
- m_pool->pokeIdleThread();
-}
-
-int getCpuCount()
+/* static */
+int ThreadPool::getCpuCount()
{
#if _WIN32
SYSTEM_INFO sysinfo;
@@ -450,8 +464,9 @@
}
return count;
-#else // if _WIN32
+#else
return 2; // default to 2 threads, everywhere else
-#endif // if _WIN32
+#endif
}
+
} // end namespace x265
diff -r d34f9d23b370 -r 4e72e207f865 source/common/threadpool.h
--- a/source/common/threadpool.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/threadpool.h Thu Feb 19 09:45:59 2015 -0600
@@ -25,84 +25,84 @@
#define X265_THREADPOOL_H
#include "common.h"
+#include "threading.h"
namespace x265 {
// x265 private namespace
class ThreadPool;
+class WorkerThread;
+class BondedTaskGroup;
-int getCpuCount();
+#if X86_64
+typedef uint64_t sleepbitmap_t;
+#else
+typedef uint32_t sleepbitmap_t;
+#endif
-// Any class that wants to distribute work to the thread pool must
-// derive from JobProvider and implement FindJob().
+static const sleepbitmap_t ALL_POOL_THREADS = (sleepbitmap_t)-1;
+enum { MAX_POOL_THREADS = sizeof(sleepbitmap_t) * 8 };
+enum { INVALID_SLICE_PRIORITY = 10 }; // a value larger than any X265_TYPE_* macro
+
+// Frame level job providers. FrameEncoder and Lookahead derive from
+// this class and implement findJob()
class JobProvider
{
-protected:
-
- ThreadPool *m_pool;
-
- JobProvider *m_nextProvider;
- JobProvider *m_prevProvider;
-
public:
- JobProvider(ThreadPool *p) : m_pool(p), m_nextProvider(0), m_prevProvider(0) {}
+ ThreadPool* m_pool;
+ sleepbitmap_t m_ownerBitmap;
+ int m_jpId;
+ int m_sliceType;
+ bool m_helpWanted;
+ bool m_isFrameEncoder; /* rather ugly hack, but nothing better presents itself */
+
+ JobProvider()
+ : m_pool(NULL)
+ , m_ownerBitmap(0)
+ , m_jpId(-1)
+ , m_sliceType(INVALID_SLICE_PRIORITY)
+ , m_helpWanted(false)
+ , m_isFrameEncoder(false)
+ {}
virtual ~JobProvider() {}
- void setThreadPool(ThreadPool *p) { m_pool = p; }
+ // Worker threads will call this method to perform work
+ virtual void findJob(int workerThreadId) = 0;
- // Register this job provider with the thread pool, jobs are available
- void enqueue();
-
- // Remove this job provider from the thread pool, all jobs complete
- void dequeue();
-
- // Worker threads will call this method to find a job. Must return true if
- // work was completed. False if no work was available.
- virtual bool findJob(int threadId) = 0;
-
- // All derived objects that call Enqueue *MUST* call flush before allowing
- // their object to be destroyed, otherwise you will see random crashes involving
- // partially freed vtables and you will be unhappy
- void flush();
-
- friend class ThreadPoolImpl;
- friend class PoolThread;
+ // Will awaken one idle thread, preferring a thread which most recently
+ // performed work for this provider.
+ void tryWakeOne();
};
-// Abstract interface to ThreadPool. Each encoder instance should call
-// AllocThreadPool() to get a handle to the singleton object and then make
-// it available to their job provider structures (wave-front frame encoders,
-// etc).
class ThreadPool
{
-protected:
-
- // Destructor is inaccessable, force the use of reference counted Release()
- ~ThreadPool() {}
-
- virtual void enqueueJobProvider(JobProvider &) = 0;
-
- virtual void dequeueJobProvider(JobProvider &) = 0;
-
public:
- // When numthreads == 0, a default thread count is used. A request may grow
- // an existing pool but it will never shrink.
- static ThreadPool *allocThreadPool(int numthreads = 0);
+ sleepbitmap_t m_sleepBitmap;
+ int m_numProviders;
+ int m_numWorkers;
+ int m_numaNode;
+ bool m_isActive;
- static ThreadPool *getThreadPool();
+ JobProvider** m_jpTable;
+ WorkerThread* m_workers;
- virtual void pokeIdleThread() = 0;
+ ThreadPool();
+ ~ThreadPool();
- // The pool is reference counted so all calls to AllocThreadPool() should be
- // followed by a call to Release()
- virtual void release() = 0;
+ bool create(int numThreads, int maxProviders, int node);
+ bool start();
+ void setCurrentThreadAffinity();
+ int tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap);
+ int tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master);
- virtual int getThreadCount() const = 0;
+ static ThreadPool* allocThreadPools(x265_param* p, int& numPools);
- friend class JobProvider;
+ static int getCpuCount();
+ static int getNumaNodeCount();
+ static void setThreadNodeAffinity(int node);
};
} // end namespace x265
diff -r d34f9d23b370 -r 4e72e207f865 source/common/wavefront.cpp
--- a/source/common/wavefront.cpp Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/wavefront.cpp Thu Feb 19 09:45:59 2015 -0600
@@ -60,7 +60,6 @@
{
uint32_t bit = 1 << (row & 31);
ATOMIC_OR(&m_internalDependencyBitmap[row >> 5], bit);
- if (m_pool) m_pool->pokeIdleThread();
}
void WaveFront::enableRow(int row)
@@ -80,11 +79,11 @@
return !!(ATOMIC_AND(&m_internalDependencyBitmap[row >> 5], ~bit) & bit);
}
-bool WaveFront::findJob(int threadId)
+void WaveFront::findJob(int threadId)
{
unsigned long id;
- // thread safe
+ /* Loop over each word until all available rows are finished */
for (int w = 0; w < m_numWords; w++)
{
uint32_t oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
@@ -97,15 +96,14 @@
{
/* we cleared the bit, we get to process the row */
processRow(w * 32 + id, threadId);
- return true;
+ m_helpWanted = true;
+ return; /* check for a higher priority task */
}
- // some other thread cleared the bit, try another bit
oldval = m_internalDependencyBitmap[w] & m_externalDependencyBitmap[w];
}
}
- // made it through the bitmap without finding any enqueued rows
- return false;
+ m_helpWanted = false;
}
}
diff -r d34f9d23b370 -r 4e72e207f865 source/common/wavefront.h
--- a/source/common/wavefront.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/common/wavefront.h Thu Feb 19 09:45:59 2015 -0600
@@ -53,10 +53,9 @@
public:
- WaveFront(ThreadPool *pool)
- : JobProvider(pool)
- , m_internalDependencyBitmap(0)
- , m_externalDependencyBitmap(0)
+ WaveFront()
+ : m_internalDependencyBitmap(NULL)
+ , m_externalDependencyBitmap(NULL)
{}
virtual ~WaveFront();
@@ -86,8 +85,8 @@
// WaveFront's implementation of JobProvider::findJob. Consults
// m_queuedBitmap and calls ProcessRow(row) for lowest numbered queued row
- // or returns false
- bool findJob(int threadId);
+ // processes available rows and returns when no work remains
+ void findJob(int threadId);
// Start or resume encode processing of this row, must be implemented by
// derived classes.
diff -r d34f9d23b370 -r 4e72e207f865 source/encoder/encoder.cpp
--- a/source/encoder/encoder.cpp Thu Feb 19 13:06:32 2015 -0600
+++ b/source/encoder/encoder.cpp Thu Feb 19 09:45:59 2015 -0600
@@ -78,7 +78,6 @@
m_buOffsetY = NULL;
m_buOffsetC = NULL;
m_threadPool = NULL;
- m_numThreadLocalData = 0;
m_analysisFile = NULL;
for (int i = 0; i < X265_MAX_FRAME_THREADS; i++)
m_frameEncoder[i] = NULL;
@@ -102,38 +101,16 @@
if (rows == 1 || cols < 3)
p->bEnableWavefront = 0;
- int poolThreadCount = p->poolNumThreads ? p->poolNumThreads : getCpuCount();
+ bool allowPools = !p->numaPools || strcmp(p->numaPools, "none");
// Trim the thread pool if --wpp, --pme, and --pmode are disabled
if (!p->bEnableWavefront && !p->bDistributeModeAnalysis && !p->bDistributeMotionEstimation)
- poolThreadCount = 0;
-
- if (poolThreadCount > 1)
- {
- m_threadPool = ThreadPool::allocThreadPool(poolThreadCount);
- poolThreadCount = m_threadPool->getThreadCount();
- }
- else
- poolThreadCount = 0;
-
- if (!poolThreadCount)
- {
- // issue warnings if any of these features were requested
- if (p->bEnableWavefront)
- x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --wpp disabled\n");
- if (p->bDistributeMotionEstimation)
- x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pme disabled\n");
- if (p->bDistributeModeAnalysis)
- x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pmode disabled\n");
-
- // disable all pool features if the thread pool is disabled or unusable.
- p->bEnableWavefront = p->bDistributeModeAnalysis = p->bDistributeMotionEstimation = 0;
- }
+ allowPools = false;
if (!p->frameNumThreads)
{
// auto-detect frame threads
- int cpuCount = getCpuCount();
+ int cpuCount = ThreadPool::getCpuCount();
if (!p->bEnableWavefront)
p->frameNumThreads = X265_MIN(cpuCount, (rows + 1) / 2);
else if (cpuCount >= 32)
@@ -148,14 +125,51 @@
p->frameNumThreads = 1;
}
- x265_log(p, X265_LOG_INFO, "WPP streams / frame threads / pool : %d / %d / %d%s%s\n",
- p->bEnableWavefront ? rows : 0, p->frameNumThreads, poolThreadCount,
- p->bDistributeMotionEstimation ? " / pme" : "", p->bDistributeModeAnalysis ? " / pmode" : "");
+ m_numPools = 0;
+ if (allowPools)
+ m_threadPool = ThreadPool::allocThreadPools(p, m_numPools);
+
+ if (!m_numPools)
+ {
+ // issue warnings if any of these features were requested
+ if (p->bEnableWavefront)
+ x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --wpp disabled\n");
+ if (p->bDistributeMotionEstimation)
+ x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pme disabled\n");
+ if (p->bDistributeModeAnalysis)
+ x265_log(p, X265_LOG_WARNING, "No thread pool allocated, --pmode disabled\n");
+
+ // disable all pool features if the thread pool is disabled or unusable.
+ p->bEnableWavefront = p->bDistributeModeAnalysis = p->bDistributeMotionEstimation = 0;
+ }
+
+ char buf[128];
+ int len = 0;
+ if (p->bEnableWavefront)
+ len += sprintf(buf + len, "wpp(%d rows)", rows);
+ if (p->bDistributeModeAnalysis)
+ len += sprintf(buf + len, "%spmode", len ? "+" : "");
+ if (p->bDistributeMotionEstimation)
+ len += sprintf(buf + len, "%spme ", len ? "+" : "");
+ if (!len)
+ strcpy(buf, "none");
+
+ x265_log(p, X265_LOG_INFO, "frame threads / pool features : %d / %s\n", p->frameNumThreads, buf);
for (int i = 0; i < m_param->frameNumThreads; i++)
+ m_frameEncoder[i] = new FrameEncoder;
+
+ if (m_numPools)
{
- m_frameEncoder[i] = new FrameEncoder;
- m_frameEncoder[i]->setThreadPool(m_threadPool);
+ for (int i = 0; i < m_param->frameNumThreads; i++)
+ {
+ int pool = i % m_numPools;
+ m_frameEncoder[i]->m_pool = &m_threadPool[pool];
+ m_frameEncoder[i]->m_jpId = m_threadPool[pool].m_numProviders++;
+ m_threadPool[pool].m_jpTable[m_frameEncoder[i]->m_jpId] = m_frameEncoder[i];
+ }
+ for (int i = 0; i < m_numPools; i++)
+ m_threadPool[i].start();
}
if (!m_scalingList.init())
@@ -171,24 +185,13 @@
m_aborted = true;
m_scalingList.setupQuantMatrices();
- /* Allocate thread local data, one for each thread pool worker and
- * if --no-wpp, one for each frame encoder */
- m_numThreadLocalData = poolThreadCount;
- if (!m_param->bEnableWavefront)
- m_numThreadLocalData += m_param->frameNumThreads;
- m_threadLocalData = new ThreadLocalData[m_numThreadLocalData];
- for (int i = 0; i < m_numThreadLocalData; i++)
+ m_lookahead = new Lookahead(m_param, m_threadPool);
+ if (m_numPools)
{
- m_threadLocalData[i].analysis.setThreadPool(m_threadPool);
- m_threadLocalData[i].analysis.initSearch(*m_param, m_scalingList);
- m_threadLocalData[i].analysis.create(m_threadLocalData);
+ m_lookahead->m_jpId = m_threadPool[0].m_numProviders++;
+ m_threadPool[0].m_jpTable[m_lookahead->m_jpId] = m_lookahead;
}
- if (!m_param->bEnableWavefront)
- for (int i = 0; i < m_param->frameNumThreads; i++)
- m_frameEncoder[i]->m_tld = &m_threadLocalData[poolThreadCount + i];
-
- m_lookahead = new Lookahead(m_param, m_threadPool);
m_dpb = new DPB(m_param);
m_rateControl = new RateControl(m_param);
@@ -236,19 +239,19 @@
int numCols = (m_param->sourceWidth + g_maxCUSize - 1) / g_maxCUSize;
for (int i = 0; i < m_param->frameNumThreads; i++)
{
- if (!m_frameEncoder[i]->init(this, numRows, numCols, i))
+ if (!m_frameEncoder[i]->init(this, numRows, numCols))
{
x265_log(m_param, X265_LOG_ERROR, "Unable to initialize frame encoder, aborting\n");
m_aborted = true;
}
+ m_frameEncoder[i]->start();
}
-
if (m_param->bEmitHRDSEI)
m_rateControl->initHRD(&m_sps);
if (!m_rateControl->init(&m_sps))
m_aborted = true;
-
- m_lookahead->init();
+ if (!m_lookahead->create())
+ m_aborted = true;
if (m_param->analysisMode)
{
@@ -282,24 +285,28 @@
if (m_rateControl)
m_rateControl->terminate(); // unblock all blocked RC calls
+ if (m_lookahead)
+ m_lookahead->stop();
+
for (int i = 0; i < m_param->frameNumThreads; i++)
- {
+ if (m_frameEncoder[i]) m_frameEncoder[i]->getEncodedPicture(m_nalList);
+
+ for (int i = 0; i < m_param->frameNumThreads; i++)
if (m_frameEncoder[i])
{
- // Ensure frame encoder is idle before destroying it
- m_frameEncoder[i]->getEncodedPicture(m_nalList);
m_frameEncoder[i]->destroy();
delete m_frameEncoder[i];
}
- }
- for (int i = 0; i < m_numThreadLocalData; i++)
- m_threadLocalData[i].destroy();
-
- delete [] m_threadLocalData;
+ // thread pools can be cleaned up now that all the JobProviders are
+ // known to be shutdown
+ delete [] m_threadPool;
if (m_lookahead)
- m_lookahead->stop();
+ {
+ m_lookahead->destroy();
+ delete m_lookahead;
+ }
delete m_dpb;
if (m_rateControl)
@@ -308,16 +315,6 @@
delete m_rateControl;
}
- // thread pool release should always happen last
- if (m_threadPool)
- m_threadPool->release();
-
- if (m_lookahead)
- {
- m_lookahead->destroy();
- delete m_lookahead;
- }
-
X265_FREE(m_cuOffsetY);
X265_FREE(m_cuOffsetC);
X265_FREE(m_buOffsetY);
@@ -445,9 +442,9 @@
inFrame = m_dpb->m_freeList.popBack();
/* Copy input picture into a Frame and PicYuv, send to lookahead */
- inFrame->m_poc = ++m_pocLast;
inFrame->m_fencPic->copyFromPicture(*pic_in, m_sps.conformanceWindow.rightOffset, m_sps.conformanceWindow.bottomOffset);
+ inFrame->m_poc = ++m_pocLast;
inFrame->m_userData = pic_in->userData;
inFrame->m_pts = pic_in->pts;
inFrame->m_forceqp = pic_in->forceqp;
@@ -459,21 +456,14 @@
/* Encoder holds a reference count until stats collection is finished */
ATOMIC_INC(&inFrame->m_countRefEncoders);
- bool bEnableWP = m_param->bEnableWeightedPred || m_param->bEnableWeightedBiPred;
- if (m_param->rc.aqMode || bEnableWP)
+
+ if ((m_param->rc.aqMode || m_param->bEnableWeightedPred || m_param->bEnableWeightedBiPred) &&
+ (m_param->rc.cuTree && m_param->rc.bStatRead))
{
- if (m_param->rc.cuTree && m_param->rc.bStatRead)
+ if (!m_rateControl->cuTreeReadFor2Pass(inFrame))
{
- if (!m_rateControl->cuTreeReadFor2Pass(inFrame))
- {
- m_aborted = 1;
- return -1;
- }
- }
- else
- {
- ProfileScopeEvent(prelookahead);
- m_rateControl->calcAdaptiveQuantFrame(inFrame);
+ m_aborted = 1;
+ return -1;
}
}
@@ -496,7 +486,7 @@
sliceType = inputPic->analysisData.sliceType;
}
- m_lookahead->addPicture(inFrame, sliceType);
+ m_lookahead->addPicture(*inFrame, sliceType);
m_numDelayedPic++;
}
else
@@ -822,17 +812,19 @@
CUStats cuStats;
for (int i = 0; i < m_param->frameNumThreads; i++)
cuStats.accumulate(m_frameEncoder[i]->m_cuStats);
-
+
if (!cuStats.totalCTUTime)
return;
-#define ELAPSED_SEC(val) ((double)(val) / 1000000)
-#define ELAPSED_MSEC(val) ((double)(val) / 1000)
+ int totalWorkerCount = 0;
+ for (int i = 0; i < m_numPools; i++)
+ totalWorkerCount += m_threadPool[i].m_numWorkers;
- int64_t lookaheadWorkerTime = m_lookahead->m_slicetypeDecideElapsedTime;
- if (m_lookahead->usingWorkerThreads())
- /* if the lookahead is not using worker threads, processRow() time is already included in slicetypeDecide time */
- lookaheadWorkerTime += m_lookahead->m_est.m_processRowElapsedTime;
+ int64_t batchElapsedTime, coopSliceElapsedTime;
+ uint64_t batchCount, coopSliceCount;
+ m_lookahead->getWorkerStats(batchElapsedTime, batchCount, coopSliceElapsedTime, coopSliceCount);
+ int64_t lookaheadWorkerTime = m_lookahead->m_slicetypeDecideElapsedTime + m_lookahead->m_preLookaheadElapsedTime +
+ batchElapsedTime + coopSliceElapsedTime;
int64_t totalWorkerTime = cuStats.totalCTUTime + cuStats.loopFilterElapsedTime + cuStats.pmodeTime + cuStats.pmeTime + lookaheadWorkerTime;
int64_t elapsedEncodeTime = x265_mdate() - m_encodeStartTime;
@@ -851,6 +843,9 @@
int64_t unaccounted = (cuStats.totalCTUTime + cuStats.pmodeTime) -
(cuStats.intraAnalysisElapsedTime + cuStats.motionEstimationElapsedTime + interRDOTotalTime + intraRDOTotalTime);
+#define ELAPSED_SEC(val) ((double)(val) / 1000000)
+#define ELAPSED_MSEC(val) ((double)(val) / 1000)
+
if (m_param->bDistributeMotionEstimation && cuStats.countPMEMasters)
{
x265_log(m_param, X265_LOG_INFO, "CU: %%%05.2lf time spent in motion estimation, averaging %.3lf CU inter modes per CTU\n",
@@ -891,10 +886,10 @@
ELAPSED_MSEC(cuStats.pmodeTime) / cuStats.countPModeTasks);
}
- x265_log(m_param, X265_LOG_INFO, "CU: %%%05.2lf time spent in slicetypeDecide (avg %.3lfms) and lookahead row cost (avg %.3lfns)\n",
+ x265_log(m_param, X265_LOG_INFO, "CU: %%%05.2lf time spent in slicetypeDecide (avg %.3lfms) and prelookahead (avg %.3lfms)\n",
100.0 * lookaheadWorkerTime / totalWorkerTime,
ELAPSED_MSEC(m_lookahead->m_slicetypeDecideElapsedTime) / m_lookahead->m_countSlicetypeDecide,
- (double)m_lookahead->m_est.m_processRowElapsedTime / m_lookahead->m_est.m_countProcessRow);
+ ELAPSED_MSEC(m_lookahead->m_preLookaheadElapsedTime) / m_lookahead->m_countPreLookahead);
x265_log(m_param, X265_LOG_INFO, "CU: %%%05.2lf time spent in other tasks\n",
100.0 * unaccounted / totalWorkerTime);
@@ -927,9 +922,9 @@
cuStats.totalCTUs / ELAPSED_SEC(totalWorkerTime));
if (m_threadPool)
- x265_log(m_param, X265_LOG_INFO, "CU: %.3lf average worker occupancy, %%%05.2lf of theoretical maximum occupancy\n",
+ x265_log(m_param, X265_LOG_INFO, "CU: %.3lf average worker utilization, %%%05.2lf of theoretical maximum utilization\n",
(double)totalWorkerTime / elapsedEncodeTime,
- 100.0 * totalWorkerTime / (elapsedEncodeTime * m_threadPool->getThreadCount()));
+ 100.0 * totalWorkerTime / (elapsedEncodeTime * totalWorkerCount));
#undef ELAPSED_SEC
#undef ELAPSED_MSEC
diff -r d34f9d23b370 -r 4e72e207f865 source/encoder/encoder.h
--- a/source/encoder/encoder.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/encoder/encoder.h Thu Feb 19 09:45:59 2015 -0600
@@ -70,7 +70,6 @@
class Lookahead;
class RateControl;
class ThreadPool;
-struct ThreadLocalData;
class Encoder : public x265_encoder
{
@@ -91,6 +90,7 @@
Frame* m_exportedPic;
+ int m_numPools;
int m_curEncoder;
/* cached PicYuv offset arrays, shared by all instances of
@@ -120,14 +120,12 @@
PPS m_pps;
NALList m_nalList;
ScalingList m_scalingList; // quantization matrix information
- int m_numThreadLocalData;
int m_lastBPSEI;
uint32_t m_numDelayedPic;
x265_param* m_param;
RateControl* m_rateControl;
- ThreadLocalData* m_threadLocalData;
Lookahead* m_lookahead;
Window m_conformanceWindow;
diff -r d34f9d23b370 -r 4e72e207f865 source/encoder/frameencoder.cpp
--- a/source/encoder/frameencoder.cpp Thu Feb 19 13:06:32 2015 -0600
+++ b/source/encoder/frameencoder.cpp Thu Feb 19 09:45:59 2015 -0600
@@ -39,13 +39,11 @@
void weightAnalyse(Slice& slice, Frame& frame, x265_param& param);
FrameEncoder::FrameEncoder()
- : WaveFront(NULL)
- , m_threadActive(true)
{
m_prevOutputTime = x265_mdate();
- m_totalWorkerElapsedTime = 0;
+ m_isFrameEncoder = true;
+ m_threadActive = true;
m_slicetypeWaitTime = 0;
- m_frameEncoderID = 0;
m_activeWorkerCount = 0;
m_bAllRowsStop = false;
m_vbvResetTriggerRow = -1;
@@ -66,7 +64,22 @@
void FrameEncoder::destroy()
{
if (m_pool)
- JobProvider::flush(); // ensure no worker threads are using this frame
+ {
+ if (!m_jpId)
+ {
+ int numTLD = m_pool->m_numWorkers;
+ if (!m_param->bEnableWavefront)
+ numTLD += m_pool->m_numProviders;
+ for (int i = 0; i < numTLD; i++)
+ m_tld[i].destroy();
+ delete [] m_tld;
+ }
+ }
+ else
+ {
+ m_tld->destroy();
+ delete m_tld;
+ }
m_threadActive = false;
m_enable.trigger();
@@ -90,7 +103,7 @@
stop();
}
-bool FrameEncoder::init(Encoder *top, int numRows, int numCols, int id)
+bool FrameEncoder::init(Encoder *top, int numRows, int numCols)
{
m_top = top;
m_param = top->m_param;
@@ -99,7 +112,6 @@
m_filterRowDelay = (m_param->bEnableSAO && m_param->bSaoNonDeblocked) ?
2 : (m_param->bEnableSAO || m_param->bEnableLoopFilter ? 1 : 0);
m_filterRowDelayCus = m_filterRowDelay * numCols;
- m_frameEncoderID = id;
m_rows = new CTURow[m_numRows];
bool ok = !!m_numRows;
@@ -134,7 +146,6 @@
else
m_param->noiseReductionIntra = m_param->noiseReductionInter = 0;
- start();
return ok;
}
@@ -205,7 +216,9 @@
{
m_slicetypeWaitTime = x265_mdate() - m_prevOutputTime;
m_frame = curFrame;
- curFrame->m_encData->m_frameEncoderID = m_frameEncoderID; // Each Frame knows the ID of the FrameEncoder encoding it
+ m_sliceType = curFrame->m_lowres.sliceType;
+ curFrame->m_encData->m_frameEncoderID = m_jpId;
+ curFrame->m_encData->m_jobProvider = this;
curFrame->m_encData->m_slice->m_mref = m_mref;
if (!m_cuGeoms)
@@ -220,19 +233,61 @@
void FrameEncoder::threadMain()
{
- THREAD_NAME("Frame", m_frameEncoderID);
+ THREAD_NAME("Frame", m_jpId);
- // worker thread routine for FrameEncoder
- do
+ if (m_pool)
{
- m_enable.wait(); // Encoder::encode() triggers this event
- if (m_threadActive)
+ m_pool->setCurrentThreadAffinity();
+
+ /* the first FE on each NUMA node is responsible for allocating thread
+ * local data for all worker threads in that pool. If WPP is disabled, then
+ * each FE also needs a TLD instance */
+ if (!m_jpId)
{
- compressFrame();
- m_done.trigger(); // FrameEncoder::getEncodedPicture() blocks for this event
+ int numTLD = m_pool->m_numWorkers;
+ if (!m_param->bEnableWavefront)
+ numTLD += m_pool->m_numProviders;
+
+ m_tld = new ThreadLocalData[numTLD];
+ for (int i = 0; i < numTLD; i++)
+ {
+ m_tld[i].analysis.m_pool = m_pool;
+ m_tld[i].analysis.initSearch(*m_param, m_top->m_scalingList);
+ m_tld[i].analysis.create(m_tld);
+ }
+
+ for (int i = 0; i < m_pool->m_numProviders; i++)
+ {
+ if (m_pool->m_jpTable[i]->m_isFrameEncoder) /* ugh; over-allocation and other issues here */
+ {
+ FrameEncoder *peer = dynamic_cast<FrameEncoder*>(m_pool->m_jpTable[i]);
+ peer->m_tld = m_tld;
+ }
+ }
}
+
+ if (m_param->bEnableWavefront)
+ m_localTldIdx = -1; // cause exception if used
+ else
+ m_localTldIdx = m_pool->m_numWorkers + m_jpId;
}
- while (m_threadActive);
+ else
+ {
+ m_tld = new ThreadLocalData;
+ m_tld->analysis.m_pool = NULL;
+ m_tld->analysis.initSearch(*m_param, m_top->m_scalingList);
+ m_tld->analysis.create(NULL);
+ m_localTldIdx = 0;
+ }
+
+ m_enable.wait(); /* Encoder::encode() triggers this event */
+
+ while (m_threadActive)
+ {
+ compressFrame();
+ m_done.trigger(); /* FrameEncoder::getEncodedPicture() blocks for this event */
+ m_enable.wait();
+ }
}
void FrameEncoder::compressFrame()
@@ -488,15 +543,31 @@
if (m_top->m_rateControl->rateControlEnd(m_frame, m_accessUnitBits, &m_rce, &m_frameStats) < 0)
m_top->m_aborted = true;
- /* Accumulate NR statistics from all worker threads */
+ /* Decrement referenced frame reference counts, allow them to be recycled */
+ for (int l = 0; l < numPredDir; l++)
+ {
+ for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
+ {
+ Frame *refpic = slice->m_refPicList[l][ref];
+ ATOMIC_DEC(&refpic->m_countRefEncoders);
+ }
+ }
+
+ int numTLD;
+ if (m_pool)
+ numTLD = m_param->bEnableWavefront ? m_pool->m_numWorkers : m_pool->m_numWorkers + m_pool->m_numProviders;
+ else
+ numTLD = 1;
+
if (m_nr)
{
- for (int i = 0; i < m_top->m_numThreadLocalData; i++)
+ /* Accumulate NR statistics from all worker threads */
+ for (int i = 0; i < numTLD; i++)
{
- NoiseReduction* nr = &m_top->m_threadLocalData[i].analysis.m_quant.m_frameNr[m_frameEncoderID];
+ NoiseReduction* nr = &m_tld[i].analysis.m_quant.m_frameNr[m_jpId];
for (int cat = 0; cat < MAX_NUM_TR_CATEGORIES; cat++)
{
- for(int coeff = 0; coeff < MAX_NUM_TR_COEFFS; coeff++)
+ for (int coeff = 0; coeff < MAX_NUM_TR_COEFFS; coeff++)
m_nr->residualSum[cat][coeff] += nr->residualSum[cat][coeff];
m_nr->count[cat] += nr->count[cat];
@@ -506,30 +577,20 @@
noiseReductionUpdate();
/* Copy updated NR coefficients back to all worker threads */
- for (int i = 0; i < m_top->m_numThreadLocalData; i++)
+ for (int i = 0; i < numTLD; i++)
{
- NoiseReduction* nr = &m_top->m_threadLocalData[i].analysis.m_quant.m_frameNr[m_frameEncoderID];
+ NoiseReduction* nr = &m_tld[i].analysis.m_quant.m_frameNr[m_jpId];
memcpy(nr->offsetDenoise, m_nr->offsetDenoise, sizeof(uint16_t) * MAX_NUM_TR_CATEGORIES * MAX_NUM_TR_COEFFS);
memset(nr->count, 0, sizeof(uint32_t) * MAX_NUM_TR_CATEGORIES);
memset(nr->residualSum, 0, sizeof(uint32_t) * MAX_NUM_TR_CATEGORIES * MAX_NUM_TR_COEFFS);
}
}
- // Decrement referenced frame reference counts, allow them to be recycled
- for (int l = 0; l < numPredDir; l++)
- {
- for (int ref = 0; ref < slice->m_numRefIdx[l]; ref++)
- {
- Frame *refpic = slice->m_refPicList[l][ref];
- ATOMIC_DEC(&refpic->m_countRefEncoders);
- }
- }
-
#if DETAILED_CU_STATS
/* Accumulate CU statistics from each worker thread, we could report
* per-frame stats here, but currently we do not. */
- for (int i = 0; i < m_top->m_numThreadLocalData; i++)
- m_cuStats.accumulate(m_top->m_threadLocalData[i].analysis.m_stats[m_frameEncoderID]);
+ for (int i = 0; i < numTLD; i++)
+ m_cuStats.accumulate(m_tld[i].analysis.m_stats[m_jpId]);
#endif
m_endFrameTime = x265_mdate();
@@ -621,10 +682,9 @@
int numPredDir = slice->isInterP() ? 1 : slice->isInterB() ? 2 : 0;
m_rows[0].active = true;
- if (m_pool && m_param->bEnableWavefront)
+ if (m_param->bEnableWavefront)
{
WaveFront::clearEnabledRowMask();
- WaveFront::enqueue();
for (uint32_t row = 0; row < m_numRows; row++)
{
@@ -645,19 +705,17 @@
}
enableRowEncoder(row);
- if (row)
- m_pool->pokeIdleThread();
- else
+ if (!row)
{
m_row0WaitTime = x265_mdate();
enqueueRowEncoder(0);
}
+ tryWakeOne();
}
m_allRowsAvailableTime = x265_mdate();
+ tryWakeOne(); /* ensure one thread is active or help-wanted flag is set prior to blocking */
m_completionEvent.wait();
-
- WaveFront::dequeue();
}
else
{
@@ -687,10 +745,9 @@
m_row0WaitTime = x265_mdate();
else if (i == m_numRows - 1)
m_allRowsAvailableTime = x265_mdate();
- processRowEncoder(i, *m_tld);
+ processRowEncoder(i, m_tld[m_localTldIdx]);
}
- // Filter
if (i >= m_filterRowDelay)
m_frameFilter.processRow(i - m_filterRowDelay);
}
@@ -706,10 +763,8 @@
const uint32_t realRow = row >> 1;
const uint32_t typeNum = row & 1;
- ThreadLocalData& tld = threadId >= 0 ? m_top->m_threadLocalData[threadId] : *m_tld;
-
if (!typeNum)
- processRowEncoder(realRow, tld);
+ processRowEncoder(realRow, m_tld[threadId]);
else
{
m_frameFilter.processRow(realRow);
@@ -932,21 +987,21 @@
}
}
- // NOTE: do CU level Filter
+ /* SAO parameter estimation using non-deblocked pixels for CTU bottom and right boundary areas */
if (m_param->bEnableSAO && m_param->bSaoNonDeblocked)
- // SAO parameter estimation using non-deblocked pixels for CTU bottom and right boundary areas
m_frameFilter.m_sao.calcSaoStatsCu_BeforeDblk(m_frame, col, row);
- // NOTE: active next row
- if (curRow.completed >= 2 && row < m_numRows - 1)
+ if (m_param->bEnableWavefront && curRow.completed >= 2 && row < m_numRows - 1 &&
+ (!m_bAllRowsStop || intRow + 1 < m_vbvResetTriggerRow))
{
+ /* activate next row */
ScopedLock below(m_rows[row + 1].lock);
if (m_rows[row + 1].active == false &&
- m_rows[row + 1].completed + 2 <= curRow.completed &&
- (!m_bAllRowsStop || intRow + 1 < m_vbvResetTriggerRow))
+ m_rows[row + 1].completed + 2 <= curRow.completed)
{
m_rows[row + 1].active = true;
enqueueRowEncoder(row + 1);
+ tryWakeOne(); /* wake up a sleeping thread or set the help wanted flag */
}
}
@@ -961,11 +1016,7 @@
}
}
- /* *this row of CTUs has been encoded* */
-
- /* flush row bitstream (if WPP and no SAO) or flush frame if no WPP and no SAO */
- if (!m_param->bEnableSAO && (m_param->bEnableWavefront || row == m_numRows - 1))
- rowCoder.finishSlice();
+ /** this row of CTUs has been compressed **/
/* If encoding with ABR, update update bits and complexity in rate control
* after a number of rows so the next frame's rateControlStart has more
@@ -994,6 +1045,10 @@
m_top->m_rateControl->rateControlUpdateStats(&m_rce);
}
+ /* flush row bitstream (if WPP and no SAO) or flush frame if no WPP and no SAO */
+ if (!m_param->bEnableSAO && (m_param->bEnableWavefront || row == m_numRows - 1))
+ rowCoder.finishSlice();
+
if (m_param->bEnableWavefront)
{
/* trigger row-wise loop filters */
@@ -1004,11 +1059,13 @@
/* NOTE: Activate filter if first row (row 0) */
if (row == m_filterRowDelay)
enqueueRowFilter(0);
+ tryWakeOne();
}
if (row == m_numRows - 1)
{
for (uint32_t i = m_numRows - m_filterRowDelay; i < m_numRows; i++)
enableRowFilter(i);
+ tryWakeOne();
}
}
diff -r d34f9d23b370 -r 4e72e207f865 source/encoder/frameencoder.h
--- a/source/encoder/frameencoder.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/encoder/frameencoder.h Thu Feb 19 09:45:59 2015 -0600
@@ -122,7 +122,7 @@
virtual ~FrameEncoder() {}
- virtual bool init(Encoder *top, int numRows, int numCols, int id);
+ virtual bool init(Encoder *top, int numRows, int numCols);
void destroy();
@@ -136,7 +136,7 @@
Event m_done;
Event m_completionEvent;
bool m_threadActive;
- int m_frameEncoderID;
+ int m_localTldIdx;
uint32_t m_numRows;
uint32_t m_numCols;
diff -r d34f9d23b370 -r 4e72e207f865 source/x265.h
--- a/source/x265.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/x265.h Thu Feb 19 09:45:59 2015 -0600
@@ -369,13 +369,53 @@
* rate-control can be negatively impacted by increases to the number of
* frame threads because the extra concurrency adds uncertainty to the
* bitrate estimations. Frame parallelism is generally limited by the the
- * number of CTU rows */
+ * is generally limited by the the number of CU rows
+ *
+ * When thread pools are used, each frame thread is assigned to a single
+ * pool and the frame thread itself is given the node affinity of its pool.
+ * But when no thread pools are used no node affinity is assigned. */
int frameNumThreads;
- /* Number of threads to allocate for the process global thread pool, if no
- * thread pool has yet been created. 0 implies auto-detection. By default
- * x265 will try to allocate one worker thread per CPU core */
- int poolNumThreads;
+ /* Comma seperated list of threads per NUMA node. If "none", then no worker
+ * pools are created and only frame parallelism is possible. If NULL or ""
+ * (default) x265 will use all available threads on each NUMA node.
+ *
+ * '+' is a special value indicating all cores detected on the node
+ * '*' is a special value indicating all cores detected on the node and all
+ * remaining nodes.
+ * '-' is a special value indicating no cores on the node, same as '0'
+ *
+ * example strings for a 4-node system:
+ * "" - default, unspecified, all numa nodes are used for thread pools
+ * "*" - same as default
+ * "none" - no thread pools are created, only frame parallelism possible
+ * "-" - same as "none"
+ * "10" - allocate one pool, using up to 10 cores on node 0
+ * "-,+" - allocate one pool, using all cores on node 1
+ * "+,-,+" - allocate two pools, using all cores on nodes 0 and 2
+ * "+,-,+,-" - allocate two pools, using all cores on nodes 0 and 2
+ * "-,*" - allocate three pools, using all cores on nodes 1, 2 and 3
+ * "8,8,8,8" - allocate four pools with up to 8 threads in each pool
+ *
+ * The total number of threads will be determined by the number of threads
+ * assigned to all nodes. The worker threads will each be given affinity for
+ * their node, they will not be allowed to migrate between nodes, but they
+ * will be allowed to move between CPU cores within their node.
+ *
+ * If the three pool features: bEnableWavefront, bDistributeModeAnalysis and
+ * bDistributeMotionEstimation are all disabled, then numaPools is ignored
+ * and no thread pools are created.
+ *
+ * If "none" is specified, then all three of the thread pool features are
+ * implicitly disabled.
+ *
+ * Multiple thread pools will be allocated for any NUMA node with more than
+ * 64 logical CPU cores. But any given thread pool will always use at most
+ * one NUMA node.
+ *
+ * Frame encoders are distributed between the available thread pools, and
+ * the encoder will never generate more thread pools than frameNumThreads */
+ char* numaPools;
/* Enable wavefront parallel processing, greatly increases parallelism for
* less than 1% compression efficiency loss. Requires a thread pool, enabled
diff -r d34f9d23b370 -r 4e72e207f865 source/x265cli.h
--- a/source/x265cli.h Thu Feb 19 13:06:32 2015 -0600
+++ b/source/x265cli.h Thu Feb 19 09:45:59 2015 -0600
@@ -37,7 +37,8 @@
{ "version", no_argument, NULL, 'V' },
{ "asm", required_argument, NULL, 0 },
{ "no-asm", no_argument, NULL, 0 },
- { "threads", required_argument, NULL, 0 },
+ { "pools", required_argument, NULL, 0 },
+ { "numa-pools", required_argument, NULL, 0 },
{ "preset", required_argument, NULL, 'p' },
{ "tune", required_argument, NULL, 't' },
{ "frame-threads", required_argument, NULL, 'F' },
More information about the x265-devel
mailing list