diff options
-rwxr-xr-x | auto_update_tests.py | 2 | ||||
-rwxr-xr-x | check.py | 2 | ||||
-rw-r--r-- | src/support/threads.cpp | 62 | ||||
-rw-r--r-- | src/support/threads.h | 14 | ||||
-rw-r--r-- | test/example/cpp-threads.cpp | 58 | ||||
-rw-r--r-- | test/example/cpp-threads.txt | 4 |
6 files changed, 109 insertions, 33 deletions
diff --git a/auto_update_tests.py b/auto_update_tests.py index 10481cab5..050dab1d1 100755 --- a/auto_update_tests.py +++ b/auto_update_tests.py @@ -184,6 +184,8 @@ for t in sorted(os.listdir(os.path.join('test', 'example'))): src, '-c', '-o', 'example.o', '-Isrc', '-g', '-L' + libdir, '-pthread'] print 'build: ', ' '.join(extra) + if src.endswith('.cpp'): + extra += ['-std=c++11'] print os.getcwd() subprocess.check_call(extra) # Link against the binaryen C library DSO, using rpath @@ -505,6 +505,8 @@ def run_gcc_torture_tests(): # build the C file separately extra = [NATIVECC, src, '-c', '-o', 'example.o', '-I' + os.path.join(options.binaryen_root, 'src'), '-g', '-L' + os.path.join(options.binaryen_bin, '..', 'lib'), '-pthread'] + if src.endswith('.cpp'): + extra += ['-std=c++11'] print 'build: ', ' '.join(extra) subprocess.check_call(extra) # Link against the binaryen C library DSO, using an executable-relative rpath diff --git a/src/support/threads.cpp b/src/support/threads.cpp index c4f714f8e..ce880ac2d 100644 --- a/src/support/threads.cpp +++ b/src/support/threads.cpp @@ -30,7 +30,7 @@ #ifdef BINARYEN_THREAD_DEBUG static std::mutex debug; #define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; } -#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; } +#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL " << std::this_thread::get_id() << "] " << x; } #else #define DEBUG_THREAD(x) #define DEBUG_POOL(x) @@ -39,21 +39,15 @@ static std::mutex debug; namespace wasm { -// Global thread information - -static std::mutex poolMutex; -static std::unique_ptr<ThreadPool> pool; - - // Thread -Thread::Thread() { - assert(!ThreadPool::isRunning()); +Thread::Thread(ThreadPool* parent) : parent(parent) { + assert(!parent->isRunning()); thread = make_unique<std::thread>(mainLoop, this); } Thread::~Thread() { - assert(!ThreadPool::isRunning()); + assert(!parent->isRunning()); { std::lock_guard<std::mutex> lock(mutex); // notify the thread that it can exit @@ -91,7 +85,7 @@ void Thread::mainLoop(void *self_) { return; } } - ThreadPool::get()->notifyThreadIsReady(); + self->parent->notifyThreadIsReady(); { std::unique_lock<std::mutex> lock(self->mutex); if (!self->done && !self->doWork) { @@ -102,18 +96,26 @@ void Thread::mainLoop(void *self_) { } } - // ThreadPool +// Global threadPool state. We have a singleton pool, which can only be +// used from one place at a time. + +static std::unique_ptr<ThreadPool> pool; + +std::mutex ThreadPool::creationMutex; +std::mutex ThreadPool::workMutex; +std::mutex ThreadPool::threadMutex; + void ThreadPool::initialize(size_t num) { if (num == 1) return; // no multiple cores, don't create threads DEBUG_POOL("initialize()\n"); - std::unique_lock<std::mutex> lock(mutex); + std::unique_lock<std::mutex> lock(threadMutex); ready.store(threads.size()); // initial state before first resetThreadsAreReady() resetThreadsAreReady(); for (size_t i = 0; i < num; i++) { try { - threads.emplace_back(make_unique<Thread>()); + threads.emplace_back(make_unique<Thread>(this)); } catch (std::system_error&) { // failed to create a thread - don't use multithreading, as if num cores == 1 DEBUG_POOL("could not create thread\n"); @@ -140,21 +142,14 @@ size_t ThreadPool::getNumCores() { ThreadPool* ThreadPool::get() { DEBUG_POOL("::get()\n"); - bool created = false; - { - // lock on the creation - std::lock_guard<std::mutex> lock(poolMutex); - if (!pool) { - DEBUG_POOL("::get() creating\n"); - created = true; - pool = make_unique<ThreadPool>(); - } - } - if (created) { - // if we created it here, do the initialization too. this - // is outside of the mutex, as we create child threads who - // will call ::get() themselves - pool->initialize(getNumCores()); + // lock on the creation + std::lock_guard<std::mutex> poolLock(creationMutex); + if (!pool) { + DEBUG_POOL("::get() creating\n"); + std::unique_ptr<ThreadPool> temp = make_unique<ThreadPool>(); + temp->initialize(getNumCores()); + // assign it to the global location now that it is all ready + pool.swap(temp); DEBUG_POOL("::get() created\n"); } return pool.get(); @@ -173,10 +168,14 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) // run in parallel on threads // TODO: fancy work stealing DEBUG_POOL("work() on threads\n"); + // lock globally on doing work in the pool - the threadPool can only be used + // from one thread at a time, all others must wait patiently + std::lock_guard<std::mutex> poolLock(workMutex); assert(doWorkers.size() == num); assert(!running); + DEBUG_POOL("running = true\n"); running = true; - std::unique_lock<std::mutex> lock(mutex); + std::unique_lock<std::mutex> lock(threadMutex); resetThreadsAreReady(); for (size_t i = 0; i < num; i++) { threads[i]->work(doWorkers[i]); @@ -184,6 +183,7 @@ void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) DEBUG_POOL("main thread waiting\n"); condition.wait(lock, [this]() { return areThreadsReady(); }); DEBUG_POOL("main thread waiting\n"); + DEBUG_POOL("running = false\n"); running = false; DEBUG_POOL("work() is done\n"); } @@ -199,7 +199,7 @@ bool ThreadPool::isRunning() { void ThreadPool::notifyThreadIsReady() { DEBUG_POOL("notify thread is ready\n";) - std::lock_guard<std::mutex> lock(mutex); + std::lock_guard<std::mutex> lock(threadMutex); ready.fetch_add(1); condition.notify_one(); } diff --git a/src/support/threads.h b/src/support/threads.h index 0ec109e4d..280e19470 100644 --- a/src/support/threads.h +++ b/src/support/threads.h @@ -38,6 +38,8 @@ enum class ThreadWorkState { Finished }; +class ThreadPool; + // // A helper thread. // @@ -45,6 +47,7 @@ enum class ThreadWorkState { // class Thread { + ThreadPool* parent; std::unique_ptr<std::thread> thread; std::mutex mutex; std::condition_variable condition; @@ -52,7 +55,7 @@ class Thread { std::function<ThreadWorkState ()> doWork = nullptr; public: - Thread(); + Thread(ThreadPool* parent); ~Thread(); // Start to do work, calling doWork() until @@ -72,10 +75,17 @@ private: class ThreadPool { std::vector<std::unique_ptr<Thread>> threads; bool running = false; - std::mutex mutex; std::condition_variable condition; std::atomic<size_t> ready; + // A mutex for creating the pool safely + static std::mutex creationMutex; + // A mutex for work() so that the pool can only work on one + // thing at a time + static std::mutex workMutex; + // A mutex for communication with the worker threads + static std::mutex threadMutex; + private: void initialize(size_t num); diff --git a/test/example/cpp-threads.cpp b/test/example/cpp-threads.cpp new file mode 100644 index 000000000..4a41249e7 --- /dev/null +++ b/test/example/cpp-threads.cpp @@ -0,0 +1,58 @@ +// test multiple uses of the threadPool + +#include <iostream> +#include <thread> +#include <vector> + +#include <binaryen-c.h> + +int NUM_THREADS = 33; + +void worker() { + BinaryenModuleRef module = BinaryenModuleCreate(); + + // Create a function type for i32 (i32, i32) + BinaryenType params[2] = { BinaryenTypeInt32(), BinaryenTypeInt32() }; + BinaryenFunctionTypeRef iii = BinaryenAddFunctionType(module, "iii", BinaryenTypeInt32(), params, 2); + + // Get the 0 and 1 arguments, and add them + BinaryenExpressionRef x = BinaryenGetLocal(module, 0, BinaryenTypeInt32()), + y = BinaryenGetLocal(module, 1, BinaryenTypeInt32()); + BinaryenExpressionRef add = BinaryenBinary(module, BinaryenAddInt32(), x, y); + BinaryenExpressionRef ret = BinaryenReturn(module, add); + + // Create the add function + // Note: no additional local variables + // Note: no basic blocks here, we are an AST. The function body is just an expression node. + BinaryenFunctionRef adder = BinaryenAddFunction(module, "adder", iii, NULL, 0, ret); + + // validate it + BinaryenModuleValidate(module); + + // optimize it + BinaryenModuleOptimize(module); + BinaryenModuleValidate(module); + + // Clean up the module, which owns all the objects we created above + BinaryenModuleDispose(module); +} + +int main() +{ + std::vector<std::thread> threads; + + std::cout << "create threads...\n"; + for (int i = 0; i < NUM_THREADS; i++) { + threads.emplace_back(worker); + } + std::cout << "threads running in parallel...\n"; + + std::cout << "waiting for threads to join...\n"; + for (auto& thread : threads) { + thread.join(); + } + + std::cout << "all done.\n"; + + return 0; +} diff --git a/test/example/cpp-threads.txt b/test/example/cpp-threads.txt new file mode 100644 index 000000000..2c638aaab --- /dev/null +++ b/test/example/cpp-threads.txt @@ -0,0 +1,4 @@ +create threads... +threads running in parallel... +waiting for threads to join... +all done. |