summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlon Zakai <alonzakai@gmail.com>2018-01-26 14:32:06 -0800
committerGitHub <noreply@github.com>2018-01-26 14:32:06 -0800
commit6c081144e5bcf31edece0806c38d63c97257cc4c (patch)
tree517b939d2b0abdfe5b9c4908df62ddb9f7206093 /src
parent4ef6655e887e4088b8faf0cf5857fd49edbcd498 (diff)
downloadbinaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.tar.gz
binaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.tar.bz2
binaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.zip
ThreadPool refactoring (#1389)
Refactor ThreadPool code for clarity and to fix some bugs with using the pool from different threads in parallel. We have a singleton pool, and need to ensure it is created only once and used only by one thread at a time. This model is a simple way to ensure we use a number of threads equal to the number of cores, more or less (a pool per Module might lead to number of cores * number of Modules being optimized). This refactoring adds a parent pointer in the worker threads (giving them direct access to the pool makes it simpler to make sure that pool and thread creation and teardown are threadsafe). This commit also adds proper locking around pool creation and pool usage.
Diffstat (limited to 'src')
-rw-r--r--src/support/threads.cpp62
-rw-r--r--src/support/threads.h14
2 files changed, 43 insertions, 33 deletions
diff --git a/src/support/threads.cpp b/src/support/threads.cpp
index c4f714f8e..ce880ac2d 100644
--- a/src/support/threads.cpp
+++ b/src/support/threads.cpp
@@ -30,7 +30,7 @@
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
-#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
+#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; }
#else
#define DEBUG_THREAD(x)
#define DEBUG_POOL(x)
@@ -39,21 +39,15 @@ static std::mutex debug;
namespace wasm {
-// Global thread information
-
-static std::mutex poolMutex;
-static std::unique_ptr<ThreadPool> pool;
-
-
// Thread
-Thread::Thread() {
- assert(!ThreadPool::isRunning());
+Thread::Thread(ThreadPool* parent) : parent(parent) {
+ assert(!parent->isRunning());
thread = make_unique<std::thread>(mainLoop, this);
}
Thread::~Thread() {
- assert(!ThreadPool::isRunning());
+ assert(!parent->isRunning());
{
std::lock_guard<std::mutex> lock(mutex);
// notify the thread that it can exit
@@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) {
return;
}
}
- ThreadPool::get()->notifyThreadIsReady();
+ self->parent->notifyThreadIsReady();
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->done && !self->doWork) {
@@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) {
}
}
-
// ThreadPool
+// Global threadPool state. We have a singleton pool, which can only be
+// used from one place at a time.
+
+static std::unique_ptr<ThreadPool> pool;
+
+std::mutex ThreadPool::creationMutex;
+std::mutex ThreadPool::workMutex;
+std::mutex ThreadPool::threadMutex;
+
void ThreadPool::initialize(size_t num) {
if (num == 1) return; // no multiple cores, don't create threads
DEBUG_POOL("initialize()\n");
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
ready.store(threads.size()); // initial state before first resetThreadsAreReady()
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
try {
- threads.emplace_back(make_unique<Thread>());
+ threads.emplace_back(make_unique<Thread>(this));
} catch (std::system_error&) {
// failed to create a thread - don't use multithreading, as if num cores == 1
DEBUG_POOL("could not create thread\n");
@@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() {
ThreadPool* ThreadPool::get() {
DEBUG_POOL("::get()\n");
- bool created = false;
- {
- // lock on the creation
- std::lock_guard<std::mutex> lock(poolMutex);
- if (!pool) {
- DEBUG_POOL("::get() creating\n");
- created = true;
- pool = make_unique<ThreadPool>();
- }
- }
- if (created) {
- // if we created it here, do the initialization too. this
- // is outside of the mutex, as we create child threads who
- // will call ::get() themselves
- pool->initialize(getNumCores());
+ // lock on the creation
+ std::lock_guard<std::mutex> poolLock(creationMutex);
+ if (!pool) {
+ DEBUG_POOL("::get() creating\n");
+ std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>();
+ temp->initialize(getNumCores());
+ // assign it to the global location now that it is all ready
+ pool.swap(temp);
DEBUG_POOL("::get() created\n");
}
return pool.get();
@@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
// run in parallel on threads
// TODO: fancy work stealing
DEBUG_POOL("work() on threads\n");
+ // lock globally on doing work in the pool - the threadPool can only be used
+ // from one thread at a time, all others must wait patiently
+ std::lock_guard<std::mutex> poolLock(workMutex);
assert(doWorkers.size() == num);
assert(!running);
+ DEBUG_POOL("running = true\n");
running = true;
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
threads[i]->work(doWorkers[i]);
@@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
DEBUG_POOL("main thread waiting\n");
condition.wait(lock, [this]() { return areThreadsReady(); });
DEBUG_POOL("main thread waiting\n");
+ DEBUG_POOL("running = false\n");
running = false;
DEBUG_POOL("work() is done\n");
}
@@ -199,7 +199,7 @@ bool ThreadPool::isRunning() {
void ThreadPool::notifyThreadIsReady() {
DEBUG_POOL("notify thread is ready\n";)
- std::lock_guard<std::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(threadMutex);
ready.fetch_add(1);
condition.notify_one();
}
diff --git a/src/support/threads.h b/src/support/threads.h
index 0ec109e4d..280e19470 100644
--- a/src/support/threads.h
+++ b/src/support/threads.h
@@ -38,6 +38,8 @@ enum class ThreadWorkState {
Finished
};
+class ThreadPool;
+
//
// A helper thread.
//
@@ -45,6 +47,7 @@ enum class ThreadWorkState {
//
class Thread {
+ ThreadPool* parent;
std::unique_ptr<std::thread> thread;
std::mutex mutex;
std::condition_variable condition;
@@ -52,7 +55,7 @@ class Thread {
std::function<ThreadWorkState ()> doWork = nullptr;
public:
- Thread();
+ Thread(ThreadPool* parent);
~Thread();
// Start to do work, calling doWork() until
@@ -72,10 +75,17 @@ private:
class ThreadPool {
std::vector<std::unique_ptr<Thread>> threads;
bool running = false;
- std::mutex mutex;
std::condition_variable condition;
std::atomic<size_t> ready;
+ // A mutex for creating the pool safely
+ static std::mutex creationMutex;
+ // A mutex for work() so that the pool can only work on one
+ // thing at a time
+ static std::mutex workMutex;
+ // A mutex for communication with the worker threads
+ static std::mutex threadMutex;
+
private:
void initialize(size_t num);