summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xauto_update_tests.py2
-rwxr-xr-xcheck.py2
-rw-r--r--src/support/threads.cpp62
-rw-r--r--src/support/threads.h14
-rw-r--r--test/example/cpp-threads.cpp58
-rw-r--r--test/example/cpp-threads.txt4
6 files changed, 109 insertions, 33 deletions
diff --git a/auto_update_tests.py b/auto_update_tests.py
index 10481cab5..050dab1d1 100755
--- a/auto_update_tests.py
+++ b/auto_update_tests.py
@@ -184,6 +184,8 @@ for t in sorted(os.listdir(os.path.join('test', 'example'))):
src, '-c', '-o', 'example.o',
'-Isrc', '-g', '-L' + libdir, '-pthread']
print 'build: ', ' '.join(extra)
+ if src.endswith('.cpp'):
+ extra += ['-std=c++11']
print os.getcwd()
subprocess.check_call(extra)
# Link against the binaryen C library DSO, using rpath
diff --git a/check.py b/check.py
index 5a926bf8e..40092d93a 100755
--- a/check.py
+++ b/check.py
@@ -505,6 +505,8 @@ def run_gcc_torture_tests():
# build the C file separately
extra = [NATIVECC, src, '-c', '-o', 'example.o',
'-I' + os.path.join(options.binaryen_root, 'src'), '-g', '-L' + os.path.join(options.binaryen_bin, '..', 'lib'), '-pthread']
+ if src.endswith('.cpp'):
+ extra += ['-std=c++11']
print 'build: ', ' '.join(extra)
subprocess.check_call(extra)
# Link against the binaryen C library DSO, using an executable-relative rpath
diff --git a/src/support/threads.cpp b/src/support/threads.cpp
index c4f714f8e..ce880ac2d 100644
--- a/src/support/threads.cpp
+++ b/src/support/threads.cpp
@@ -30,7 +30,7 @@
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; }
-#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; }
+#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; }
#else
#define DEBUG_THREAD(x)
#define DEBUG_POOL(x)
@@ -39,21 +39,15 @@ static std::mutex debug;
namespace wasm {
-// Global thread information
-
-static std::mutex poolMutex;
-static std::unique_ptr<ThreadPool> pool;
-
-
// Thread
-Thread::Thread() {
- assert(!ThreadPool::isRunning());
+Thread::Thread(ThreadPool* parent) : parent(parent) {
+ assert(!parent->isRunning());
thread = make_unique<std::thread>(mainLoop, this);
}
Thread::~Thread() {
- assert(!ThreadPool::isRunning());
+ assert(!parent->isRunning());
{
std::lock_guard<std::mutex> lock(mutex);
// notify the thread that it can exit
@@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) {
return;
}
}
- ThreadPool::get()->notifyThreadIsReady();
+ self->parent->notifyThreadIsReady();
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->done && !self->doWork) {
@@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) {
}
}
-
// ThreadPool
+// Global threadPool state. We have a singleton pool, which can only be
+// used from one place at a time.
+
+static std::unique_ptr<ThreadPool> pool;
+
+std::mutex ThreadPool::creationMutex;
+std::mutex ThreadPool::workMutex;
+std::mutex ThreadPool::threadMutex;
+
void ThreadPool::initialize(size_t num) {
if (num == 1) return; // no multiple cores, don't create threads
DEBUG_POOL("initialize()\n");
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
ready.store(threads.size()); // initial state before first resetThreadsAreReady()
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
try {
- threads.emplace_back(make_unique<Thread>());
+ threads.emplace_back(make_unique<Thread>(this));
} catch (std::system_error&) {
// failed to create a thread - don't use multithreading, as if num cores == 1
DEBUG_POOL("could not create thread\n");
@@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() {
ThreadPool* ThreadPool::get() {
DEBUG_POOL("::get()\n");
- bool created = false;
- {
- // lock on the creation
- std::lock_guard<std::mutex> lock(poolMutex);
- if (!pool) {
- DEBUG_POOL("::get() creating\n");
- created = true;
- pool = make_unique<ThreadPool>();
- }
- }
- if (created) {
- // if we created it here, do the initialization too. this
- // is outside of the mutex, as we create child threads who
- // will call ::get() themselves
- pool->initialize(getNumCores());
+ // lock on the creation
+ std::lock_guard<std::mutex> poolLock(creationMutex);
+ if (!pool) {
+ DEBUG_POOL("::get() creating\n");
+ std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>();
+ temp->initialize(getNumCores());
+ // assign it to the global location now that it is all ready
+ pool.swap(temp);
DEBUG_POOL("::get() created\n");
}
return pool.get();
@@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
// run in parallel on threads
// TODO: fancy work stealing
DEBUG_POOL("work() on threads\n");
+ // lock globally on doing work in the pool - the threadPool can only be used
+ // from one thread at a time, all others must wait patiently
+ std::lock_guard<std::mutex> poolLock(workMutex);
assert(doWorkers.size() == num);
assert(!running);
+ DEBUG_POOL("running = true\n");
running = true;
- std::unique_lock<std::mutex> lock(mutex);
+ std::unique_lock<std::mutex> lock(threadMutex);
resetThreadsAreReady();
for (size_t i = 0; i < num; i++) {
threads[i]->work(doWorkers[i]);
@@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers)
DEBUG_POOL("main thread waiting\n");
condition.wait(lock, [this]() { return areThreadsReady(); });
DEBUG_POOL("main thread waiting\n");
+ DEBUG_POOL("running = false\n");
running = false;
DEBUG_POOL("work() is done\n");
}
@@ -199,7 +199,7 @@ bool ThreadPool::isRunning() {
void ThreadPool::notifyThreadIsReady() {
DEBUG_POOL("notify thread is ready\n";)
- std::lock_guard<std::mutex> lock(mutex);
+ std::lock_guard<std::mutex> lock(threadMutex);
ready.fetch_add(1);
condition.notify_one();
}
diff --git a/src/support/threads.h b/src/support/threads.h
index 0ec109e4d..280e19470 100644
--- a/src/support/threads.h
+++ b/src/support/threads.h
@@ -38,6 +38,8 @@ enum class ThreadWorkState {
Finished
};
+class ThreadPool;
+
//
// A helper thread.
//
@@ -45,6 +47,7 @@ enum class ThreadWorkState {
//
class Thread {
+ ThreadPool* parent;
std::unique_ptr<std::thread> thread;
std::mutex mutex;
std::condition_variable condition;
@@ -52,7 +55,7 @@ class Thread {
std::function<ThreadWorkState ()> doWork = nullptr;
public:
- Thread();
+ Thread(ThreadPool* parent);
~Thread();
// Start to do work, calling doWork() until
@@ -72,10 +75,17 @@ private:
class ThreadPool {
std::vector<std::unique_ptr<Thread>> threads;
bool running = false;
- std::mutex mutex;
std::condition_variable condition;
std::atomic<size_t> ready;
+ // A mutex for creating the pool safely
+ static std::mutex creationMutex;
+ // A mutex for work() so that the pool can only work on one
+ // thing at a time
+ static std::mutex workMutex;
+ // A mutex for communication with the worker threads
+ static std::mutex threadMutex;
+
private:
void initialize(size_t num);
diff --git a/test/example/cpp-threads.cpp b/test/example/cpp-threads.cpp
new file mode 100644
index 000000000..4a41249e7
--- /dev/null
+++ b/test/example/cpp-threads.cpp
@@ -0,0 +1,58 @@
+// test multiple uses of the threadPool
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+#include <binaryen-c.h>
+
+int NUM_THREADS = 33;
+
+void worker() {
+ BinaryenModuleRef module = BinaryenModuleCreate();
+
+ // Create a function type for i32 (i32, i32)
+ BinaryenType params[2] = { BinaryenTypeInt32(), BinaryenTypeInt32() };
+ BinaryenFunctionTypeRef iii = BinaryenAddFunctionType(module, "iii", BinaryenTypeInt32(), params, 2);
+
+ // Get the 0 and 1 arguments, and add them
+ BinaryenExpressionRef x = BinaryenGetLocal(module, 0, BinaryenTypeInt32()),
+ y = BinaryenGetLocal(module, 1, BinaryenTypeInt32());
+ BinaryenExpressionRef add = BinaryenBinary(module, BinaryenAddInt32(), x, y);
+ BinaryenExpressionRef ret = BinaryenReturn(module, add);
+
+ // Create the add function
+ // Note: no additional local variables
+ // Note: no basic blocks here, we are an AST. The function body is just an expression node.
+ BinaryenFunctionRef adder = BinaryenAddFunction(module, "adder", iii, NULL, 0, ret);
+
+ // validate it
+ BinaryenModuleValidate(module);
+
+ // optimize it
+ BinaryenModuleOptimize(module);
+ BinaryenModuleValidate(module);
+
+ // Clean up the module, which owns all the objects we created above
+ BinaryenModuleDispose(module);
+}
+
+int main()
+{
+ std::vector<std::thread> threads;
+
+ std::cout << "create threads...\n";
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads.emplace_back(worker);
+ }
+ std::cout << "threads running in parallel...\n";
+
+ std::cout << "waiting for threads to join...\n";
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ std::cout << "all done.\n";
+
+ return 0;
+}
diff --git a/test/example/cpp-threads.txt b/test/example/cpp-threads.txt
new file mode 100644
index 000000000..2c638aaab
--- /dev/null
+++ b/test/example/cpp-threads.txt
@@ -0,0 +1,4 @@
+create threads...
+threads running in parallel...
+waiting for threads to join...
+all done.