[x265] [PATCH] threadpool: use a wait event per worker thread
Steve Borho
steve at borho.org
Tue Jan 28 08:57:32 CET 2014
# HG changeset patch
# User Steve Borho <steve at borho.org>
# Date 1390894762 21600
# Tue Jan 28 01:39:22 2014 -0600
# Node ID ac40431a75590666df9c92c039a61e9f11123496
# Parent 637a937eee78242ba763a1a1e3cff1ac391520b1
threadpool: use a wait event per worker thread
For simplicity, this patch caps the number of worker threads to 64. The bitmap
could be trivially extended if necessary.
This removes the common wake event, which complicated startup and shutdown and
flush events.
diff -r 637a937eee78 -r ac40431a7559 source/common/threadpool.cpp
--- a/source/common/threadpool.cpp Tue Jan 28 00:17:28 2014 -0600
+++ b/source/common/threadpool.cpp Tue Jan 28 01:39:22 2014 -0600
@@ -25,6 +25,7 @@
#include "threadpool.h"
#include "threading.h"
+#include "common.h"
#include <assert.h>
#include <string.h>
#include <new>
@@ -48,35 +49,39 @@
PoolThread& operator =(const PoolThread&);
+ int m_id;
+
bool m_dirty;
- bool m_idle;
-
bool m_exited;
+ Event m_wakeEvent;
+
public:
- PoolThread(ThreadPoolImpl& pool) : m_pool(pool), m_dirty(false), m_idle(false), m_exited(false) {}
+ PoolThread(ThreadPoolImpl& pool, int id)
+ : m_pool(pool)
+ , m_id(id)
+ , m_dirty(false)
+ , m_exited(false)
+ {
+ }
- //< query if thread is still potentially walking provider list
- bool isDirty() const { return !m_idle && m_dirty; }
-
- //< set m_dirty if the thread might be walking provider list
- void markDirty() { m_dirty = !m_idle; }
+ bool isDirty() const { return m_dirty; }
+ void markDirty() { m_dirty = true; }
bool isExited() const { return m_exited; }
+ void poke() { m_wakeEvent.trigger(); }
+
virtual ~PoolThread() {}
void threadMain();
- static volatile int s_sleepCount;
- static Event s_wakeEvent;
+ static volatile uint64_t s_sleepMap;
};
-volatile int PoolThread::s_sleepCount = 0;
-
-Event PoolThread::s_wakeEvent;
+volatile uint64_t PoolThread::s_sleepMap /* = 0 */;
class ThreadPoolImpl : public ThreadPool
{
@@ -155,14 +160,14 @@
cur = cur->m_nextProvider;
}
+ // this thread has reached the end of the provider list
m_dirty = false;
+
if (cur == NULL)
{
- m_idle = true;
- ATOMIC_INC(&s_sleepCount);
- s_wakeEvent.wait();
- ATOMIC_DEC(&s_sleepCount);
- m_idle = false;
+ uint64_t bit = 1LL << m_id;
+ ATOMIC_OR(&s_sleepMap, bit);
+ m_wakeEvent.wait();
}
}
@@ -171,7 +176,17 @@
void ThreadPoolImpl::pokeIdleThread()
{
- PoolThread::s_wakeEvent.trigger();
+ /* Find a bit in the sleeping thread bitmap and poke it awake */
+ uint64_t oldval = PoolThread::s_sleepMap;
+ if (oldval)
+ {
+ unsigned long id;
+ CTZ64(id, oldval);
+
+ uint64_t newval = oldval & ~(1LL << id);
+ if (ATOMIC_CAS(&PoolThread::s_sleepMap, oldval, newval) == oldval)
+ m_threads[id].poke();
+ }
}
ThreadPoolImpl *ThreadPoolImpl::instance;
@@ -211,6 +226,7 @@
{
if (numThreads == 0)
numThreads = get_cpu_count();
+ numThreads = X265_MIN(64, numThreads); // do not overslow sleep map
char *buffer = new char[sizeof(PoolThread) * numThreads];
m_threads = reinterpret_cast<PoolThread*>(buffer);
@@ -218,16 +234,19 @@
if (m_threads)
{
+ uint64_t idlemap = 0;
+
m_ok = true;
for (int i = 0; i < numThreads; i++)
{
- new (buffer)PoolThread(*this);
+ new (buffer)PoolThread(*this, i);
buffer += sizeof(PoolThread);
m_ok = m_ok && m_threads[i].start();
+ idlemap |= (1LL << i);
}
// Wait for threads to spin up and idle
- while (PoolThread::s_sleepCount < m_numThreads)
+ while (PoolThread::s_sleepMap != idlemap)
{
GIVE_UP_TIME();
}
@@ -238,18 +257,24 @@
{
if (m_ok)
{
+ uint64_t idlemap = 0;
+ for (int i = 0; i < m_numThreads; i++)
+ idlemap |= (1LL << i);
+
// wait for all threads to idle
- while (PoolThread::s_sleepCount < m_numThreads)
+ while (PoolThread::s_sleepMap != idlemap)
{
GIVE_UP_TIME();
}
// set invalid flag, then wake them up so they exit their main func
m_ok = false;
- int exited_count;
+ for (int i = 0; i < m_numThreads; i++)
+ pokeIdleThread();
+
+ int exited_count = 0;
do
{
- pokeIdleThread();
GIVE_UP_TIME();
exited_count = 0;
for (int i = 0; i < m_numThreads; i++)
@@ -319,14 +344,14 @@
p.m_prevProvider = NULL;
}
-/* Ensure all threads are either idle, or have made a full
- * pass through the provider list, ensuring dequeued providers
- * are safe for deletion. */
+/* Ensure all threads have made a full pass through the provider list, ensuring
+ * dequeued providers are safe for deletion. */
void ThreadPoolImpl::FlushProviderList()
{
for (int i = 0; i < m_numThreads; i++)
{
m_threads[i].markDirty();
+ m_threads[i].poke();
}
int i;
More information about the x265-devel
mailing list