diff options
-rw-r--r-- | CMakeLists.txt | 9 | ||||
-rwxr-xr-x | check.py | 2 | ||||
-rw-r--r-- | src/emscripten-optimizer/istring.h | 24 | ||||
-rw-r--r-- | src/mixed_arena.h | 7 | ||||
-rw-r--r-- | src/passes/MergeBlocks.cpp | 2 | ||||
-rw-r--r-- | src/passes/OptimizeInstructions.cpp | 2 | ||||
-rw-r--r-- | src/passes/PostEmscripten.cpp | 2 | ||||
-rw-r--r-- | src/passes/RemoveUnusedBrs.cpp | 2 | ||||
-rw-r--r-- | src/passes/RemoveUnusedNames.cpp | 2 | ||||
-rw-r--r-- | src/passes/ReorderLocals.cpp | 1 | ||||
-rw-r--r-- | src/passes/SimplifyLocals.cpp | 7 | ||||
-rw-r--r-- | src/passes/Vacuum.cpp | 2 | ||||
-rw-r--r-- | src/s2wasm.h | 5 | ||||
-rw-r--r-- | src/support/threads.cpp | 181 | ||||
-rw-r--r-- | src/support/threads.h | 107 | ||||
-rw-r--r-- | src/wasm-traversal.h | 67 |
16 files changed, 394 insertions, 28 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index a529410b6..f95e8aeca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,13 @@ FUNCTION(ADD_COMPILE_FLAG value) ENDFOREACH(variable) ENDFUNCTION() +FUNCTION(ADD_LINK_FLAG value) + MESSAGE(STATUS "Linking with ${value}") + FOREACH(variable CMAKE_EXE_LINKER_FLAGS) + SET(${variable} "${${variable}} ${value}" PARENT_SCOPE) + ENDFOREACH(variable) +ENDFUNCTION() + # Compiler setup. INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/src) @@ -51,6 +58,7 @@ ELSE() ADD_COMPILE_FLAG("-Wextra") ADD_COMPILE_FLAG("-Wno-unused-parameter") ADD_COMPILE_FLAG("-fno-omit-frame-pointer") + ADD_LINK_FLAG("-pthread") IF(uppercase_CMAKE_BUILD_TYPE STREQUAL "DEBUG") ADD_COMPILE_FLAG("-O0") ADD_COMPILE_FLAG("-g3") @@ -75,6 +83,7 @@ SET(support_SOURCES src/support/command-line.cpp src/support/file.cpp src/support/safe_integer.cpp + src/support/threads.cpp ) ADD_LIBRARY(support STATIC ${support_SOURCES}) @@ -573,7 +573,7 @@ cmd = [os.environ.get('CXX') or 'g++', '-std=c++11', os.path.join('test', 'example', 'find_div0s.cpp'), os.path.join('src', 'pass.cpp'), os.path.join('src', 'passes', 'Print.cpp'), - '-Isrc', '-g', '-lsupport', '-Llib/.'] + '-Isrc', '-g', '-lsupport', '-Llib/.', '-pthread'] if os.environ.get('COMPILER_FLAGS'): for f in os.environ.get('COMPILER_FLAGS').split(' '): cmd.append(f) diff --git a/src/emscripten-optimizer/istring.h b/src/emscripten-optimizer/istring.h index 8e91d044d..149f77d23 100644 --- a/src/emscripten-optimizer/istring.h +++ b/src/emscripten-optimizer/istring.h @@ -29,6 +29,8 @@ #include <stdio.h> #include <assert.h> +#include "support/threads.h" + namespace cashew { struct IString { @@ -66,22 +68,24 @@ struct IString { typedef std::unordered_set<const char *, CStringHash, CStringEqual> StringSet; static StringSet* strings = new StringSet(); - if (reuse) { - auto result = strings->insert(s); // if already present, does nothing - str = *(result.first); - } else { - auto existing = strings->find(s); - if (existing == strings->end()) { + auto existing = strings->find(s); + + if (existing == strings->end()) { + // the StringSet cache is a global shared structure, which should + // not be modified by multiple threads at once. + assert(!wasm::ThreadPool::isRunning()); + if (!reuse) { size_t len = strlen(s) + 1; char *copy = (char*)malloc(len); // XXX leaked strncpy(copy, s, len); s = copy; - strings->insert(s); - } else { - s = *existing; } - str = s; + strings->insert(s); + } else { + s = *existing; } + + str = s; } void set(const IString &s) { diff --git a/src/mixed_arena.h b/src/mixed_arena.h index 82464d51e..dc8cbad5b 100644 --- a/src/mixed_arena.h +++ b/src/mixed_arena.h @@ -19,6 +19,8 @@ #include <vector> +#include "support/threads.h" + // // Arena allocation for mixed-type data. // @@ -29,6 +31,9 @@ struct MixedArena { template<class T> T* alloc() { + // this structure should not be modified by multiple threads at once. + assert(!wasm::ThreadPool::isRunning()); + const size_t CHUNK = 10000; size_t currSize = (sizeof(T) + 7) & (-8); // same alignment as malloc TODO optimize? assert(currSize < CHUNK); @@ -43,6 +48,8 @@ struct MixedArena { } void clear() { + assert(!wasm::ThreadPool::isRunning()); + for (char* chunk : chunks) { delete[] chunk; } diff --git a/src/passes/MergeBlocks.cpp b/src/passes/MergeBlocks.cpp index 578d4fc45..5033d4016 100644 --- a/src/passes/MergeBlocks.cpp +++ b/src/passes/MergeBlocks.cpp @@ -24,6 +24,8 @@ namespace wasm { struct MergeBlocks : public WalkerPass<PostWalker<MergeBlocks>> { + bool isFunctionParallel() { return true; } + void visitBlock(Block *curr) { bool more = true; while (more) { diff --git a/src/passes/OptimizeInstructions.cpp b/src/passes/OptimizeInstructions.cpp index ca79468f5..28cac726a 100644 --- a/src/passes/OptimizeInstructions.cpp +++ b/src/passes/OptimizeInstructions.cpp @@ -26,6 +26,8 @@ namespace wasm { struct OptimizeInstructions : public WalkerPass<PostWalker<OptimizeInstructions>> { + bool isFunctionParallel() { return true; } + void visitIf(If* curr) { // flip branches to get rid of an i32.eqz if (curr->ifFalse) { diff --git a/src/passes/PostEmscripten.cpp b/src/passes/PostEmscripten.cpp index effbad30a..ef01b11fe 100644 --- a/src/passes/PostEmscripten.cpp +++ b/src/passes/PostEmscripten.cpp @@ -25,6 +25,8 @@ namespace wasm { struct PostEmscripten : public WalkerPass<PostWalker<PostEmscripten>> { + bool isFunctionParallel() { return true; } + // When we have a Load from a local value (typically a GetLocal) plus a constant offset, // we may be able to fold it in. // The semantics of the Add are to wrap, while wasm offset semantics purposefully do diff --git a/src/passes/RemoveUnusedBrs.cpp b/src/passes/RemoveUnusedBrs.cpp index 41db36d2c..6718fc14b 100644 --- a/src/passes/RemoveUnusedBrs.cpp +++ b/src/passes/RemoveUnusedBrs.cpp @@ -24,6 +24,8 @@ namespace wasm { struct RemoveUnusedBrs : public WalkerPass<PostWalker<RemoveUnusedBrs>> { + bool isFunctionParallel() { return true; } + // preparation: try to unify branches, as the fewer there are, the higher a chance we can remove them // specifically for if-else, turn an if-else with branches to the same target at the end of each // child, and with a value, to a branch to that target containing the if-else diff --git a/src/passes/RemoveUnusedNames.cpp b/src/passes/RemoveUnusedNames.cpp index 71569eefb..d33b5081a 100644 --- a/src/passes/RemoveUnusedNames.cpp +++ b/src/passes/RemoveUnusedNames.cpp @@ -24,6 +24,8 @@ namespace wasm { struct RemoveUnusedNames : public WalkerPass<PostWalker<RemoveUnusedNames>> { + bool isFunctionParallel() { return true; } + // We maintain a list of branches that we saw in children, then when we reach // a parent block, we know if it was branched to std::set<Name> branchesSeen; diff --git a/src/passes/ReorderLocals.cpp b/src/passes/ReorderLocals.cpp index ca046773b..66ea131de 100644 --- a/src/passes/ReorderLocals.cpp +++ b/src/passes/ReorderLocals.cpp @@ -27,6 +27,7 @@ namespace wasm { struct ReorderLocals : public WalkerPass<PostWalker<ReorderLocals>> { + bool isFunctionParallel() { return true; } std::map<Name, uint32_t> counts; diff --git a/src/passes/SimplifyLocals.cpp b/src/passes/SimplifyLocals.cpp index 408fea7ab..6220d0780 100644 --- a/src/passes/SimplifyLocals.cpp +++ b/src/passes/SimplifyLocals.cpp @@ -33,6 +33,8 @@ namespace wasm { struct SimplifyLocals : public WalkerPass<LinearExecutionWalker<SimplifyLocals>> { + bool isFunctionParallel() { return true; } + struct SinkableInfo { Expression** item; EffectAnalyzer effects; @@ -157,7 +159,7 @@ struct SimplifyLocals : public WalkerPass<LinearExecutionWalker<SimplifyLocals>> self->pushTask(visitPre, currp); } - void startWalk(Function *func) { + void walk(Expression*& root) { // multiple passes may be required per function, consider this: // x = load // y = store @@ -166,7 +168,7 @@ struct SimplifyLocals : public WalkerPass<LinearExecutionWalker<SimplifyLocals>> do { sunk = false; // main operation - walk(func->body); + WalkerPass<LinearExecutionWalker<SimplifyLocals>>::walk(root); // after optimizing a function, we can see if we have set_locals // for a local with no remaining gets, in which case, we can // remove the set. @@ -192,6 +194,7 @@ struct SimplifyLocals : public WalkerPass<LinearExecutionWalker<SimplifyLocals>> // clean up numGetLocals.clear(); setLocalOrigins.clear(); + sinkables.clear(); } while (sunk); } }; diff --git a/src/passes/Vacuum.cpp b/src/passes/Vacuum.cpp index ef83958c4..060dcd9dc 100644 --- a/src/passes/Vacuum.cpp +++ b/src/passes/Vacuum.cpp @@ -24,6 +24,8 @@ namespace wasm { struct Vacuum : public WalkerPass<PostWalker<Vacuum>> { + bool isFunctionParallel() { return true; } + void visitBlock(Block *curr) { // compress out nops int skip = 0; diff --git a/src/s2wasm.h b/src/s2wasm.h index 70b6515ed..1edbb7b6b 100644 --- a/src/s2wasm.h +++ b/src/s2wasm.h @@ -1402,8 +1402,6 @@ public: std::map<std::string, size_t> ids; std::set<std::string> allSigs; - AsmConstWalker(S2WasmBuilder* parent) : parent(parent) {} - void visitCallImport(CallImport* curr) { if (curr->target == EMSCRIPTEN_ASM_CONST) { auto arg = curr->operands[0]->cast<Const>(); @@ -1455,7 +1453,8 @@ public: return code; } }; - AsmConstWalker walker(this); + AsmConstWalker walker; + walker.parent = this; walker.startWalk(&wasm); // print o << "\"asmConsts\": {"; diff --git a/src/support/threads.cpp b/src/support/threads.cpp new file mode 100644 index 000000000..6edff21cb --- /dev/null +++ b/src/support/threads.cpp @@ -0,0 +1,181 @@ +/* + * Copyright 2016 WebAssembly Community Group participants + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <assert.h> + +#include <iostream> + +#include "threads.h" + + +// debugging tools + +#ifdef BINARYEN_THREAD_DEBUG +static std::mutex debug; +#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[THREAD " << std::this_thread::get_id() << "] " << x; } +#define DEBUG_POOL(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[POOL] " << x; } +#else +#define DEBUG_THREAD(x) +#define DEBUG_POOL(x) +#endif + + +namespace wasm { + +// Global thread information + +static std::unique_ptr<ThreadPool> pool; + + +// Thread + +Thread::Thread() { + assert(!ThreadPool::get()->isRunning()); + thread = std::unique_ptr<std::thread>(new std::thread(mainLoop, this)); +} + +Thread::~Thread() { + assert(!ThreadPool::get()->isRunning()); + { + std::lock_guard<std::mutex> lock(mutex); + // notify the thread that it can exit + done = true; + condition.notify_one(); + } + thread->join(); +} + +void Thread::work(std::function<ThreadWorkState ()> doWork_) { + // TODO: fancy work stealing + DEBUG_THREAD("send work to thread\n"); + { + std::lock_guard<std::mutex> lock(mutex); + // notify the thread that it can do some work + doWork = doWork_; + condition.notify_one(); + DEBUG_THREAD("work sent\n"); + } +} + +void Thread::mainLoop(void *self_) { + auto* self = static_cast<Thread*>(self_); + while (1) { + DEBUG_THREAD("checking for work\n"); + { + std::unique_lock<std::mutex> lock(self->mutex); + if (self->doWork) { + DEBUG_THREAD("doing work\n"); + // run tasks until they are all done + while (self->doWork() == ThreadWorkState::More) {} + self->doWork = nullptr; + } else if (self->done) { + DEBUG_THREAD("done\n"); + return; + } + } + ThreadPool::get()->notifyThreadIsReady(); + { + std::unique_lock<std::mutex> lock(self->mutex); + if (!self->done && !self->doWork) { + DEBUG_THREAD("thread waiting\n"); + self->condition.wait(lock); + } + } + } +} + + +// ThreadPool + +void ThreadPool::initialize(size_t num) { + if (num == 1) return; // no multiple cores, don't create threads + DEBUG_POOL("initialize()\n"); + std::unique_lock<std::mutex> lock(mutex); + ready.store(threads.size()); // initial state before first resetThreadsAreReady() + resetThreadsAreReady(); + for (size_t i = 0; i < num; i++) { + threads.emplace_back(std::unique_ptr<Thread>(new Thread())); + } + DEBUG_POOL("initialize() waiting\n"); + condition.wait(lock, [this]() { return areThreadsReady(); }); + DEBUG_POOL("initialize() is done\n"); +} + +ThreadPool* ThreadPool::get() { + if (!pool) { + size_t num = std::max(1U, std::thread::hardware_concurrency()); + pool = std::unique_ptr<ThreadPool>(new ThreadPool()); + pool->initialize(num); + } + return pool.get(); +} + +void ThreadPool::work(std::vector<std::function<ThreadWorkState ()>>& doWorkers) { + size_t num = threads.size(); + // If no multiple cores, or on a side thread, do not use worker threads + if (num == 0) { + // just run sequentially + DEBUG_POOL("work() sequentially\n"); + assert(doWorkers.size() > 0); + while (doWorkers[0]() == ThreadWorkState::More) {} + return; + } + // run in parallel on threads + // TODO: fancy work stealing + DEBUG_POOL("work() on threads\n"); + assert(doWorkers.size() == num); + assert(!running); + running = true; + std::unique_lock<std::mutex> lock(mutex); + resetThreadsAreReady(); + for (size_t i = 0; i < num; i++) { + threads[i]->work(doWorkers[i]); + } + DEBUG_POOL("main thread waiting\n"); + condition.wait(lock, [this]() { return areThreadsReady(); }); + DEBUG_POOL("main thread waiting\n"); + running = false; + DEBUG_POOL("work() is done\n"); +} + +size_t ThreadPool::size() { + return threads.size(); +} + +bool ThreadPool::isRunning() { + return pool && pool->running; +} + +void ThreadPool::notifyThreadIsReady() { + DEBUG_POOL("notify thread is ready\n";) + std::lock_guard<std::mutex> lock(mutex); + ready.fetch_add(1); + condition.notify_one(); +} + +void ThreadPool::resetThreadsAreReady() { + DEBUG_POOL("reset threads are ready\n";) + auto old = ready.exchange(0); + assert(old == threads.size()); +} + +bool ThreadPool::areThreadsReady() { + DEBUG_POOL("are threads ready?\n";) + return ready.load() == threads.size(); +} + +} // namespace wasm + diff --git a/src/support/threads.h b/src/support/threads.h new file mode 100644 index 000000000..9a43ac699 --- /dev/null +++ b/src/support/threads.h @@ -0,0 +1,107 @@ +/* + * Copyright 2016 WebAssembly Community Group participants + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// Threads helpers. +// + +#ifndef wasm_support_threads_h +#define wasm_support_threads_h + +#include <atomic> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> + +namespace wasm { + +// The work state of a helper thread - is there more to do, +// or are we finished for now. +enum class ThreadWorkState { + More, + Finished +}; + +// +// A helper thread. +// +// You can only create and destroy these on the main thread. +// + +class Thread { + std::unique_ptr<std::thread> thread; + std::mutex mutex; + std::condition_variable condition; + bool done = false; + std::function<ThreadWorkState ()> doWork = nullptr; + +public: + Thread(); + ~Thread(); + + // Start to do work, calling doWork() until + // it returns false. + void work(std::function<ThreadWorkState ()> doWork); + +private: + static void mainLoop(void *self); +}; + +// +// A pool of helper threads. +// +// There is only one, to avoid recursive pools using too many cores. +// + +class ThreadPool { + std::vector<std::unique_ptr<Thread>> threads; + bool running = false; + std::mutex mutex; + std::condition_variable condition; + std::atomic<size_t> ready; + +private: + void initialize(size_t num); + +public: + // Get the singleton threadpool. This can return null + // if there is just one thread available. + static ThreadPool* get(); + + // Execute a bunch of tasks by the pool. This calls + // getTask() (in a thread-safe manner) to get tasks, and + // sends them to workers to be executed. This method + // blocks until all tasks are complete. + void work(std::vector<std::function<ThreadWorkState ()>>& doWorkers); + + size_t size(); + + static bool isRunning(); + + // Called by helper threads when they are free and ready. + void notifyThreadIsReady(); + +private: + void resetThreadsAreReady(); + + bool areThreadsReady(); +}; + +} // namespace wasm + +#endif // wasm_support_threads_h diff --git a/src/wasm-traversal.h b/src/wasm-traversal.h index efbe12586..031343d46 100644 --- a/src/wasm-traversal.h +++ b/src/wasm-traversal.h @@ -28,6 +28,7 @@ #define wasm_traversal_h #include "wasm.h" +#include "support/threads.h" namespace wasm { @@ -112,22 +113,29 @@ struct Walker : public Visitor<SubType> { // passes that need to do the same thing for every node type. void visitExpression(Expression* curr) {} + // Function parallelism. By default, walks are not run in parallel, but you + // can override this method to say that functions are parallelizable. This + // should always be safe *unless* you do something in the pass that makes it + // not thread-safe; in other words, the Module and Function objects and + // so forth are set up so that Functions can be processed in parallel, so + // if you do not ad global state that could be raced on, your pass could be + // function-parallel. + // + // Function-parallel passes create an instance of the Walker class per core. + // That means that you can't rely on Walker object properties to persist across + // your functions, and you can't expect a new object to be created for each + // function either (which could be very inefficient). + bool isFunctionParallel() { return false; } + // Node replacing as we walk - call replaceCurrent from // your visitors. - Expression *replace = nullptr; - void replaceCurrent(Expression *expression) { replace = expression; } // Walk starting - void startWalk(Function *func) { - SubType* self = static_cast<SubType*>(this); - self->walk(func->body); - } - void startWalk(Module *module) { // Dispatch statically through the SubType. SubType* self = static_cast<SubType*>(this); @@ -140,9 +148,42 @@ struct Walker : public Visitor<SubType> { for (auto curr : module->exports) { self->visitExport(curr); } - for (auto curr : module->functions) { - self->startWalk(curr); - self->visitFunction(curr); + + // if this is not a function-parallel traversal, run + // sequentially + if (!self->isFunctionParallel()) { + for (auto curr : module->functions) { + self->walk(curr->body); + self->visitFunction(curr); + } + } else { + // execute in parallel on helper threads + size_t num = ThreadPool::get()->size(); + std::vector<std::unique_ptr<SubType>> instances; + std::vector<std::function<ThreadWorkState ()>> doWorkers; + std::atomic<size_t> nextFunction; + nextFunction.store(0); + size_t numFunctions = module->functions.size(); + for (size_t i = 0; i < num; i++) { + auto* instance = new SubType(); + instances.push_back(std::unique_ptr<SubType>(instance)); + doWorkers.push_back([instance, &nextFunction, numFunctions, &module]() { + auto index = nextFunction.fetch_add(1); + // get the next task, if there is one + if (index >= numFunctions) { + return ThreadWorkState::Finished; // nothing left + } + Function* curr = module->functions[index]; + // do the current task + instance->walk(curr->body); + instance->visitFunction(curr); + if (index + 1 == numFunctions) { + return ThreadWorkState::Finished; // we did the last one + } + return ThreadWorkState::More; + }); + } + ThreadPool::get()->work(doWorkers); } self->visitTable(&module->table); self->visitMemory(&module->memory); @@ -161,8 +202,6 @@ struct Walker : public Visitor<SubType> { Task(TaskFunc func, Expression** currp) : func(func), currp(currp) {} }; - std::vector<Task> stack; - void pushTask(TaskFunc func, Expression** currp) { stack.emplace_back(func, currp); } @@ -216,6 +255,10 @@ struct Walker : public Visitor<SubType> { static void doVisitHost(SubType* self, Expression** currp) { self->visitExpression(*currp); self->visitHost((*currp)->cast<Host>()); } static void doVisitNop(SubType* self, Expression** currp) { self->visitExpression(*currp); self->visitNop((*currp)->cast<Nop>()); } static void doVisitUnreachable(SubType* self, Expression** currp) { self->visitExpression(*currp); self->visitUnreachable((*currp)->cast<Unreachable>()); } + +private: + Expression *replace = nullptr; // a node to replace + std::vector<Task> stack; // stack of tasks }; // Walks in post-order, i.e., children first. When there isn't an obvious |