diff options
Diffstat (limited to 'src/support/threads.cpp')
-rw-r--r-- | src/support/threads.cpp | 62 |
1 files changed, 31 insertions, 31 deletions
diff --git a/src/support/threads.cpp b/src/support/threads.cpp index c4f714f8e..ce880ac2d 100644 --- a/src/support/threads.cpp +++ b/src/support/threads.cpp @@ -30,7 +30,7 @@ #ifdef BINARYEN_THREAD_DEBUG static std::mutex debug; #define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; } -#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; } +#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; } #else #define DEBUG_THREAD(x) #define DEBUG_POOL(x) @@ -39,21 +39,15 @@ static std::mutex debug; namespace wasm { -// Global thread information - -static std::mutex poolMutex; -static std::unique_ptr<ThreadPool> pool; - - // Thread -Thread::Thread() { - assert(!ThreadPool::isRunning()); +Thread::Thread(ThreadPool* parent) : parent(parent) { + assert(!parent->isRunning()); thread = make_unique<std::thread>(mainLoop, this); } Thread::~Thread() { - assert(!ThreadPool::isRunning()); + assert(!parent->isRunning()); { std::lock_guard<std::mutex> lock(mutex); // notify the thread that it can exit @@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) { return; } } - ThreadPool::get()->notifyThreadIsReady(); + self->parent->notifyThreadIsReady(); { std::unique_lock<std::mutex> lock(self->mutex); if (!self->done && !self->doWork) { @@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) { } } - // ThreadPool +// Global threadPool state. We have a singleton pool, which can only be +// used from one place at a time. + +static std::unique_ptr<ThreadPool> pool; + +std::mutex ThreadPool::creationMutex; +std::mutex ThreadPool::workMutex; +std::mutex ThreadPool::threadMutex; + void ThreadPool::initialize(size_t num) { if (num == 1) return; // no multiple cores, don't create threads DEBUG_POOL("initialize()\n"); - std::unique_lock<std::mutex> lock(mutex); + std::unique_lock<std::mutex> lock(threadMutex); ready.store(threads.size()); // initial state before first resetThreadsAreReady() resetThreadsAreReady(); for (size_t i = 0; i < num; i++) { try { - threads.emplace_back(make_unique<Thread>()); + threads.emplace_back(make_unique<Thread>(this)); } catch (std::system_error&) { // failed to create a thread - don't use multithreading, as if num cores == 1 DEBUG_POOL("could not create thread\n"); @@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() { ThreadPool* ThreadPool::get() { DEBUG_POOL("::get()\n"); - bool created = false; - { - // lock on the creation - std::lock_guard<std::mutex> lock(poolMutex); - if (!pool) { - DEBUG_POOL("::get() creating\n"); - created = true; - pool = make_unique<ThreadPool>(); - } - } - if (created) { - // if we created it here, do the initialization too. this - // is outside of the mutex, as we create child threads who - // will call ::get() themselves - pool->initialize(getNumCores()); + // lock on the creation + std::lock_guard<std::mutex> poolLock(creationMutex); + if (!pool) { + DEBUG_POOL("::get() creating\n"); + std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>(); + temp->initialize(getNumCores()); + // assign it to the global location now that it is all ready + pool.swap(temp); DEBUG_POOL("::get() created\n"); } return pool.get(); @@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) // run in parallel on threads // TODO: fancy work stealing DEBUG_POOL("work() on threads\n"); + // lock globally on doing work in the pool - the threadPool can only be used + // from one thread at a time, all others must wait patiently + std::lock_guard<std::mutex> poolLock(workMutex); assert(doWorkers.size() == num); assert(!running); + DEBUG_POOL("running = true\n"); running = true; - std::unique_lock<std::mutex> lock(mutex); + std::unique_lock<std::mutex> lock(threadMutex); resetThreadsAreReady(); for (size_t i = 0; i < num; i++) { threads[i]->work(doWorkers[i]); @@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) DEBUG_POOL("main thread waiting\n"); condition.wait(lock, [this]() { return areThreadsReady(); }); DEBUG_POOL("main thread waiting\n"); + DEBUG_POOL("running = false\n"); running = false; DEBUG_POOL("work() is done\n"); } @@ -199,7 +199,7 @@ bool ThreadPool::isRunning() { void ThreadPool::notifyThreadIsReady() { DEBUG_POOL("notify thread is ready\n";) - std::lock_guard<std::mutex> lock(mutex); + std::lock_guard<std::mutex> lock(threadMutex); ready.fetch_add(1); condition.notify_one(); } |