summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlon Zakai <alonzakai@gmail.com>2018-01-26 14:32:06 -0800
committerGitHub <noreply@github.com>2018-01-26 14:32:06 -0800
commit6c081144e5bcf31edece0806c38d63c97257cc4c (patch)
tree517b939d2b0abdfe5b9c4908df62ddb9f7206093
parent4ef6655e887e4088b8faf0cf5857fd49edbcd498 (diff)
downloadbinaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.tar.gz
binaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.tar.bz2
binaryen-6c081144e5bcf31edece0806c38d63c97257cc4c.zip
ThreadPool refactoring (#1389)
Refactor ThreadPool code for clarity and to fix some bugs with using the pool from different threads in parallel. We have a singleton pool, and need to ensure it is created only once and used only by one thread at a time. This model is a simple way to ensure we use a number of threads equal to the number of cores, more or less (a pool per Module might lead to number of cores * number of Modules being optimized). This refactoring adds a parent pointer in the worker threads (giving them direct access to the pool makes it simpler to make sure that pool and thread creation and teardown are threadsafe). This commit also adds proper locking around pool creation and pool usage.
-rwxr-xr-xauto_update_tests.py2
-rwxr-xr-xcheck.py2
-rw-r--r--src/support/threads.cpp62
-rw-r--r--src/support/threads.h14
-rw-r--r--test/example/cpp-threads.cpp58
-rw-r--r--test/example/cpp-threads.txt4
6 files changed, 109 insertions, 33 deletions
diff --git a/auto_update_tests.py b/auto_update_tests.py
index 10481cab5..050dab1d1 100755
--- a/auto_update_tests.py
+++ b/auto_update_tests.py
@@ -184,6 +184,8 @@ for t in sorted(os.listdir(os.path.join('test', 'example'))):
src, '-c', '-o', 'example.o',
'-Isrc', '-g', '-L' + libdir, '-pthread']
print 'build: ', ' '.join(extra)
+ if src.endswith('.cpp'):
+ extra += ['-std=c++11']
print os.getcwd()
subprocess.check_call(extra)
# Link against the binaryen C library DSO, using rpath
diff --git a/check.py b/check.py
index 5a926bf8e..40092d93a 100755
--- a/check.py
+++ b/check.py
@@ -505,6 +505,8 @@ def run_gcc_torture_tests():
# build the C file separately
extra = [NATIVECC, src, '-c', '-o', 'example.o',
'-I' + os.path.join(options.binaryen_root, 'src'), '-g', '-L' + os.path.join(options.binaryen_bin, '..', 'lib'), '-pthread']
+ if src.endswith('.cpp'):
+ extra += ['-std=c++11']
print 'build: ', ' '.join(extra)
subprocess.check_call(extra)
# Link against the binaryen C library DSO, using an executable-relative rpath
diff --git a/src/support/threads.cpp b/src/support/threads.cpp
index c4f714f8e..ce880ac2d 100644
--- a/src/support/threads.cpp
+++ b/src/support/threads.cpp
@@ -30,7 +30,7 @@
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
-#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
+#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; }
#else
#define DEBUG_THREAD(x)
#define DEBUG_POOL(x)
@@ -39,21 +39,15 @@ static std::mutex debug;
namespace wasm {
-// Global thread information
-
-static std::mutex poolMutex;
-static std::unique_ptr<ThreadPool> pool;
-
-
// Thread
-Thread::Thread() {
- assert(!ThreadPool::isRunning());
+Thread::Thread(ThreadPool* parent) : parent(parent) {
+ assert(!parent->isRunning());
thread = make_unique<std::thread>(mainLoop, this);
}
Thread::~Thread() {
- assert(!ThreadPool::isRunning());
+ assert(!parent->isRunning());
{
std::lock_guard<std::mutex> lock(mutex);
// notify the thread that it can exit
@@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) {
return;
}
}
- ThreadPool::get()->notifyThreadIsReady();
+ self->parent->notifyThreadIsReady();
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->done && !self->doWork) {
@@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) {
}
}
-
// ThreadPool
+// Global threadPool state. We have a singleton pool, which can only be
+// used from one place at a time.
+
+static std::unique_ptr<ThreadPool> pool;
+
+std::mutex ThreadPool::creationMutex;
+std::mutex ThreadPool::workMutex;
+std::mutex ThreadPool::threadMutex;
+
void ThreadPool::initialize(size_t num) {
if (num == 1) return; // no multiple cores, don't create threads
DEBUG_POOL("initialize()\n");
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
ready.store(threads.size()); // initial state before first resetThreadsAreReady()
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
try {
- threads.emplace_back(make_unique<Thread>());
+ threads.emplace_back(make_unique<Thread>(this));
} catch (std::system_error&) {
// failed to create a thread - don't use multithreading, as if num cores == 1
DEBUG_POOL("could not create thread\n");
@@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() {
ThreadPool* ThreadPool::get() {
DEBUG_POOL("::get()\n");
- bool created = false;
- {
- // lock on the creation
- std::lock_guard<std::mutex> lock(poolMutex);
- if (!pool) {
- DEBUG_POOL("::get() creating\n");
- created = true;
- pool = make_unique<ThreadPool>();
- }
- }
- if (created) {
- // if we created it here, do the initialization too. this
- // is outside of the mutex, as we create child threads who
- // will call ::get() themselves
- pool->initialize(getNumCores());
+ // lock on the creation
+ std::lock_guard<std::mutex> poolLock(creationMutex);
+ if (!pool) {
+ DEBUG_POOL("::get() creating\n");
+ std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>();
+ temp->initialize(getNumCores());
+ // assign it to the global location now that it is all ready
+ pool.swap(temp);
DEBUG_POOL("::get() created\n");
}
return pool.get();
@@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
// run in parallel on threads
// TODO: fancy work stealing
DEBUG_POOL("work() on threads\n");
+ // lock globally on doing work in the pool - the threadPool can only be used
+ // from one thread at a time, all others must wait patiently
+ std::lock_guard<std::mutex> poolLock(workMutex);
assert(doWorkers.size() == num);
assert(!running);
+ DEBUG_POOL("running = true\n");
running = true;
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
threads[i]->work(doWorkers[i]);
@@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
DEBUG_POOL("main thread waiting\n");
condition.wait(lock, [this]() { return areThreadsReady(); });
DEBUG_POOL("main thread waiting\n");
+ DEBUG_POOL("running = false\n");
running = false;
DEBUG_POOL("work() is done\n");
}
@@ -199,7 +199,7 @@ bool ThreadPool::isRunning() {
void ThreadPool::notifyThreadIsReady() {
DEBUG_POOL("notify thread is ready\n";)
- std::lock_guard<std::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(threadMutex);
ready.fetch_add(1);
condition.notify_one();
}
diff --git a/src/support/threads.h b/src/support/threads.h
index 0ec109e4d..280e19470 100644
--- a/src/support/threads.h
+++ b/src/support/threads.h
@@ -38,6 +38,8 @@ enum class ThreadWorkState {
Finished
};
+class ThreadPool;
+
//
// A helper thread.
//
@@ -45,6 +47,7 @@ enum class ThreadWorkState {
//
class Thread {
+ ThreadPool* parent;
std::unique_ptr<std::thread> thread;
std::mutex mutex;
std::condition_variable condition;
@@ -52,7 +55,7 @@ class Thread {
std::function<ThreadWorkState ()> doWork = nullptr;
public:
- Thread();
+ Thread(ThreadPool* parent);
~Thread();
// Start to do work, calling doWork() until
@@ -72,10 +75,17 @@ private:
class ThreadPool {
std::vector<std::unique_ptr<Thread>> threads;
bool running = false;
- std::mutex mutex;
std::condition_variable condition;
std::atomic<size_t> ready;
+ // A mutex for creating the pool safely
+ static std::mutex creationMutex;
+ // A mutex for work() so that the pool can only work on one
+ // thing at a time
+ static std::mutex workMutex;
+ // A mutex for communication with the worker threads
+ static std::mutex threadMutex;
+
private:
void initialize(size_t num);
diff --git a/test/example/cpp-threads.cpp b/test/example/cpp-threads.cpp
new file mode 100644
index 000000000..4a41249e7
--- /dev/null
+++ b/test/example/cpp-threads.cpp
@@ -0,0 +1,58 @@
+// test multiple uses of the threadPool
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+#include <binaryen-c.h>
+
+int NUM_THREADS = 33;
+
+void worker() {
+ BinaryenModuleRef module = BinaryenModuleCreate();
+
+ // Create a function type for i32 (i32, i32)
+ BinaryenType params[2] = { BinaryenTypeInt32(), BinaryenTypeInt32() };
+ BinaryenFunctionTypeRef iii = BinaryenAddFunctionType(module, "iii", BinaryenTypeInt32(), params, 2);
+
+ // Get the 0 and 1 arguments, and add them
+ BinaryenExpressionRef x = BinaryenGetLocal(module, 0, BinaryenTypeInt32()),
+ y = BinaryenGetLocal(module, 1, BinaryenTypeInt32());
+ BinaryenExpressionRef add = BinaryenBinary(module, BinaryenAddInt32(), x, y);
+ BinaryenExpressionRef ret = BinaryenReturn(module, add);
+
+ // Create the add function
+ // Note: no additional local variables
+ // Note: no basic blocks here, we are an AST. The function body is just an expression node.
+ BinaryenFunctionRef adder = BinaryenAddFunction(module, "adder", iii, NULL, 0, ret);
+
+ // validate it
+ BinaryenModuleValidate(module);
+
+ // optimize it
+ BinaryenModuleOptimize(module);
+ BinaryenModuleValidate(module);
+
+ // Clean up the module, which owns all the objects we created above
+ BinaryenModuleDispose(module);
+}
+
+int main()
+{
+ std::vector<std::thread> threads;
+
+ std::cout << "create threads...\n";
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads.emplace_back(worker);
+ }
+ std::cout << "threads running in parallel...\n";
+
+ std::cout << "waiting for threads to join...\n";
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ std::cout << "all done.\n";
+
+ return 0;
+}
diff --git a/test/example/cpp-threads.txt b/test/example/cpp-threads.txt
new file mode 100644
index 000000000..2c638aaab
--- /dev/null
+++ b/test/example/cpp-threads.txt
@@ -0,0 +1,4 @@
+create threads...
+threads running in parallel...
+waiting for threads to join...
+all done.