summaryrefslogtreecommitdiff
path: root/src/support/threads.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/support/threads.cpp')
-rw-r--r--src/support/threads.cpp62
1 files changed, 31 insertions, 31 deletions
diff --git a/src/support/threads.cpp b/src/support/threads.cpp
index c4f714f8e..ce880ac2d 100644
--- a/src/support/threads.cpp
+++ b/src/support/threads.cpp
@@ -30,7 +30,7 @@
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
-#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
+#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; }
#else
#define DEBUG_THREAD(x)
#define DEBUG_POOL(x)
@@ -39,21 +39,15 @@ static std::mutex debug;
namespace wasm {
-// Global thread information
-
-static std::mutex poolMutex;
-static std::unique_ptr<ThreadPool> pool;
-
-
// Thread
-Thread::Thread() {
- assert(!ThreadPool::isRunning());
+Thread::Thread(ThreadPool* parent) : parent(parent) {
+ assert(!parent->isRunning());
thread = make_unique<std::thread>(mainLoop, this);
}
Thread::~Thread() {
- assert(!ThreadPool::isRunning());
+ assert(!parent->isRunning());
{
std::lock_guard<std::mutex> lock(mutex);
// notify the thread that it can exit
@@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) {
return;
}
}
- ThreadPool::get()->notifyThreadIsReady();
+ self->parent->notifyThreadIsReady();
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->done && !self->doWork) {
@@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) {
}
}
-
// ThreadPool
+// Global threadPool state. We have a singleton pool, which can only be
+// used from one place at a time.
+
+static std::unique_ptr<ThreadPool> pool;
+
+std::mutex ThreadPool::creationMutex;
+std::mutex ThreadPool::workMutex;
+std::mutex ThreadPool::threadMutex;
+
void ThreadPool::initialize(size_t num) {
if (num == 1) return; // no multiple cores, don't create threads
DEBUG_POOL("initialize()\n");
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
ready.store(threads.size()); // initial state before first resetThreadsAreReady()
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
try {
- threads.emplace_back(make_unique<Thread>());
+ threads.emplace_back(make_unique<Thread>(this));
} catch (std::system_error&) {
// failed to create a thread - don't use multithreading, as if num cores == 1
DEBUG_POOL("could not create thread\n");
@@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() {
ThreadPool* ThreadPool::get() {
DEBUG_POOL("::get()\n");
- bool created = false;
- {
- // lock on the creation
- std::lock_guard<std::mutex> lock(poolMutex);
- if (!pool) {
- DEBUG_POOL("::get() creating\n");
- created = true;
- pool = make_unique<ThreadPool>();
- }
- }
- if (created) {
- // if we created it here, do the initialization too. this
- // is outside of the mutex, as we create child threads who
- // will call ::get() themselves
- pool->initialize(getNumCores());
+ // lock on the creation
+ std::lock_guard<std::mutex> poolLock(creationMutex);
+ if (!pool) {
+ DEBUG_POOL("::get() creating\n");
+ std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>();
+ temp->initialize(getNumCores());
+ // assign it to the global location now that it is all ready
+ pool.swap(temp);
DEBUG_POOL("::get() created\n");
}
return pool.get();
@@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
// run in parallel on threads
// TODO: fancy work stealing
DEBUG_POOL("work() on threads\n");
+ // lock globally on doing work in the pool - the threadPool can only be used
+ // from one thread at a time, all others must wait patiently
+ std::lock_guard<std::mutex> poolLock(workMutex);
assert(doWorkers.size() == num);
assert(!running);
+ DEBUG_POOL("running = true\n");
running = true;
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
threads[i]->work(doWorkers[i]);
@@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
DEBUG_POOL("main thread waiting\n");
condition.wait(lock, [this]() { return areThreadsReady(); });
DEBUG_POOL("main thread waiting\n");
+ DEBUG_POOL("running = false\n");
running = false;
DEBUG_POOL("work() is done\n");
}
@@ -199,7 +199,7 @@ bool ThreadPool::isRunning() {
void ThreadPool::notifyThreadIsReady() {
DEBUG_POOL("notify thread is ready\n";)
- std::lock_guard<std::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(threadMutex);
ready.fetch_add(1);
condition.notify_one();
}