summaryrefslogtreecommitdiff
path: root/src/support/threads.cpp
diff options
context:
space:
mode:
authorAlon Zakai <alonzakai@gmail.com>2016-04-15 15:34:07 -0700
committerAlon Zakai <alonzakai@gmail.com>2016-04-15 15:34:07 -0700
commit4dfeb1c3a84b13188c22e158c5947c68964cddff (patch)
tree211536213451e145357727e51f3d1c48a127ea2f /src/support/threads.cpp
parentf2905f962984df939555aad134c48a194b9e270d (diff)
downloadbinaryen-4dfeb1c3a84b13188c22e158c5947c68964cddff.tar.gz
binaryen-4dfeb1c3a84b13188c22e158c5947c68964cddff.tar.bz2
binaryen-4dfeb1c3a84b13188c22e158c5947c68964cddff.zip
Function parallelism (#343)
* allow traversals to mark themselves as function-parallel, in which case we run them using a thread pool. also mark some thread-safety risks (interned strings, arena allocators) with assertions they modify only on the main thread
Diffstat (limited to 'src/support/threads.cpp')
-rw-r--r--src/support/threads.cpp181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/support/threads.cpp b/src/support/threads.cpp
new file mode 100644
index 000000000..6edff21cb
--- /dev/null
+++ b/src/support/threads.cpp
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2016 WebAssembly Community Group participants
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include <iostream>
+
+#include "threads.h"
+
+
+// debugging tools
+
+#ifdef BINARYEN_THREAD_DEBUG
+static std::mutex debug;
+#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
+#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
+#else
+#define DEBUG_THREAD(x)
+#define DEBUG_POOL(x)
+#endif
+
+
+namespace wasm {
+
+// Global thread information
+
+static std::unique_ptr<ThreadPool> pool;
+
+
+// Thread
+
+Thread::Thread() {
+ assert(!ThreadPool::get()->isRunning());
+ thread = std::unique_ptr<std::thread>(new std::thread(mainLoop, this));
+}
+
+Thread::~Thread() {
+ assert(!ThreadPool::get()->isRunning());
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ // notify the thread that it can exit
+ done = true;
+ condition.notify_one();
+ }
+ thread->join();
+}
+
+void Thread::work(std::function<ThreadWorkState ()> doWork_) {
+ // TODO: fancy work stealing
+ DEBUG_THREAD("send work to thread\n");
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ // notify the thread that it can do some work
+ doWork = doWork_;
+ condition.notify_one();
+ DEBUG_THREAD("work sent\n");
+ }
+}
+
+void Thread::mainLoop(void *self_) {
+ auto* self = static_cast<Thread*>(self_);
+ while (1) {
+ DEBUG_THREAD("checking for work\n");
+ {
+ std::unique_lock<std::mutex> lock(self->mutex);
+ if (self->doWork) {
+ DEBUG_THREAD("doing work\n");
+ // run tasks until they are all done
+ while (self->doWork() == ThreadWorkState::More) {}
+ self->doWork = nullptr;
+ } else if (self->done) {
+ DEBUG_THREAD("done\n");
+ return;
+ }
+ }
+ ThreadPool::get()->notifyThreadIsReady();
+ {
+ std::unique_lock<std::mutex> lock(self->mutex);
+ if (!self->done && !self->doWork) {
+ DEBUG_THREAD("thread waiting\n");
+ self->condition.wait(lock);
+ }
+ }
+ }
+}
+
+
+// ThreadPool
+
+void ThreadPool::initialize(size_t num) {
+ if (num == 1) return; // no multiple cores, don't create threads
+ DEBUG_POOL("initialize()\n");
+ std::unique_lock<std::mutex> lock(mutex);
+ ready.store(threads.size()); // initial state before first resetThreadsAreReady()
+ resetThreadsAreReady();
+ for (size_t i = 0; i < num; i++) {
+ threads.emplace_back(std::unique_ptr<Thread>(new Thread()));
+ }
+ DEBUG_POOL("initialize() waiting\n");
+ condition.wait(lock, [this]() { return areThreadsReady(); });
+ DEBUG_POOL("initialize() is done\n");
+}
+
+ThreadPool* ThreadPool::get() {
+ if (!pool) {
+ size_t num = std::max(1U, std::thread::hardware_concurrency());
+ pool = std::unique_ptr<ThreadPool>(new ThreadPool());
+ pool->initialize(num);
+ }
+ return pool.get();
+}
+
+void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) {
+ size_t num = threads.size();
+ // If no multiple cores, or on a side thread, do not use worker threads
+ if (num == 0) {
+ // just run sequentially
+ DEBUG_POOL("work() sequentially\n");
+ assert(doWorkers.size() > 0);
+ while (doWorkers[0]() == ThreadWorkState::More) {}
+ return;
+ }
+ // run in parallel on threads
+ // TODO: fancy work stealing
+ DEBUG_POOL("work() on threads\n");
+ assert(doWorkers.size() == num);
+ assert(!running);
+ running = true;
+ std::unique_lock<std::mutex> lock(mutex);
+ resetThreadsAreReady();
+ for (size_t i = 0; i < num; i++) {
+ threads[i]->work(doWorkers[i]);
+ }
+ DEBUG_POOL("main thread waiting\n");
+ condition.wait(lock, [this]() { return areThreadsReady(); });
+ DEBUG_POOL("main thread waiting\n");
+ running = false;
+ DEBUG_POOL("work() is done\n");
+}
+
+size_t ThreadPool::size() {
+ return threads.size();
+}
+
+bool ThreadPool::isRunning() {
+ return pool && pool->running;
+}
+
+void ThreadPool::notifyThreadIsReady() {
+ DEBUG_POOL("notify thread is ready\n";)
+ std::lock_guard<std::mutex> lock(mutex);
+ ready.fetch_add(1);
+ condition.notify_one();
+}
+
+void ThreadPool::resetThreadsAreReady() {
+ DEBUG_POOL("reset threads are ready\n";)
+ auto old = ready.exchange(0);
+ assert(old == threads.size());
+}
+
+bool ThreadPool::areThreadsReady() {
+ DEBUG_POOL("are threads ready?\n";)
+ return ready.load() == threads.size();
+}
+
+} // namespace wasm
+