diff options
Diffstat (limited to 'src/wasm-module-building.h')
-rw-r--r-- | src/wasm-module-building.h | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/wasm-module-building.h b/src/wasm-module-building.h new file mode 100644 index 000000000..3cdebc558 --- /dev/null +++ b/src/wasm-module-building.h @@ -0,0 +1,252 @@ +/* + * Copyright 2016 WebAssembly Community Group participants + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef optimizing_incremental_module_builder_h +#define optimizing_incremental_module_builder_h + +#include <wasm.h> +#include <support/threads.h> + +namespace wasm { + +#ifdef BINARYEN_THREAD_DEBUG +static std::mutex debug; +#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[OptimizingIncrementalModuleBuilder Threading (thread: " << std::this_thread::get_id() << ")] " << x; std::cerr << '\n'; } +#else +#define DEBUG_THREAD(x) +#endif + +// +// OptimizingIncrementalModuleBuilder +// +// Helps build wasm modules efficiently. If you build a module by +// adding function by function, and you want to optimize them, this class +// starts optimizing using worker threads *while you are still adding*. +// It runs function optimization passes at that time, and then at the end +// it runs global module-level optimization passes. The result is a fully +// optimized module, optimized while being generated. +// +// This might also be faster than normal module optimization since it +// runs all passes on each function, then goes on to the next function +// which is better for data locality. +// +// Usage: Create an instance, passing it the module and the total +// number of functions. Then call addFunction as you have +// new functions to add (this also adds it to the module). Finally, +// call finish() when all functions have been added. +// +// This avoids locking by using atomics. We allocate an array of nullptrs +// that represent all the functions, and as we add a function, we place it +// at the next index. Each worker will read from the start to the end, +// and when a non-nullptr is found, the worker optimizes that function and +// nulls it. There is also an end marker that is not nullptr nor the address of +// a valid function, which represents that beyond this point we have not +// yet filled in. In other words, +// * the main thread fills everything with the end marker +// * the main thread transforms a marker entry into a function +// * workers pause when they see the marker +// * workers skip over nullptrs +// * workers transform functions into nullptrs, and optimize them +// * we keep an atomic count of the number of active workers and +// the number of optimized functions. +// * after adding a function, the main thread wakes up workers if +// it calculates there is work for them. +// * a lock is used for going to sleep and waking up. +// Locking should be rare, as optimization is +// generally slower than generation; in the optimal case, we never +// lock beyond the first step, and all further work is lock-free. +// + +class OptimizingIncrementalModuleBuilder { + Module* wasm; + uint32_t numFunctions; + Function* endMarker; + std::atomic<Function*>* list; + uint32_t nextFunction; // only used on main thread + uint32_t numWorkers; + std::vector<std::unique_ptr<std::thread>> threads; + std::atomic<uint32_t> liveWorkers, activeWorkers, availableFuncs, finishedFuncs; + std::mutex mutex; + std::condition_variable condition; + bool finishing; + +public: + // numFunctions must be equal to the number of functions allocated, or higher. Knowing + // this bounds helps avoid locking. + OptimizingIncrementalModuleBuilder(Module* wasm, Index numFunctions) : wasm(wasm), numFunctions(numFunctions), nextFunction(0), finishing(false) { + // prepare work list + endMarker = new Function(); + list = new std::atomic<Function*>[numFunctions]; + for (uint32_t i = 0; i < numFunctions; i++) { + list[i].store(endMarker); + } + // create workers + DEBUG_THREAD("creating workers"); + numWorkers = ThreadPool::getNumCores(); + assert(numWorkers >= 1); + liveWorkers.store(0); + activeWorkers.store(0); + for (uint32_t i = 0; i < numWorkers; i++) { // TODO: one less, and add it at the very end, to not compete with main thread? + createWorker(); + } + waitUntilAllReady(); + DEBUG_THREAD("workers are ready"); + // prepare the rest of the initial state + availableFuncs.store(0); + finishedFuncs.store(0); + } + + ~OptimizingIncrementalModuleBuilder() { + delete[] list; + delete endMarker; + } + + // Add a function to the module, and to be optimized + void addFunction(Function* func) { + wasm->addFunction(func); + queueFunction(func); + // wake workers if needed + auto wake = availableFuncs.load(); + for (uint32_t i = 0; i < wake; i++) { + wakeWorker(); + } + } + + // All functions have been added, block until all are optimized, and then do + // global optimizations. When this returns, the module is ready and optimized. + void finish() { + DEBUG_THREAD("finish()ing"); + assert(nextFunction == numFunctions); + wakeAllWorkers(); + waitUntilAllFinished(); + optimizeGlobally(); + // TODO: clear side thread allocators from module allocator, as these threads were transient + } + +private: + void createWorker() { + DEBUG_THREAD("create a worker"); + threads.emplace_back(std::unique_ptr<std::thread>(new std::thread(workerMain, this))); + } + + void wakeWorker() { + DEBUG_THREAD("wake a worker"); + std::lock_guard<std::mutex> lock(mutex); + condition.notify_one(); + } + + void wakeAllWorkers() { + DEBUG_THREAD("wake all workers"); + std::lock_guard<std::mutex> lock(mutex); + condition.notify_all(); + } + + void waitUntilAllReady() { + DEBUG_THREAD("wait until all workers are ready"); + std::unique_lock<std::mutex> lock(mutex); + if (liveWorkers.load() < numWorkers) { + condition.wait(lock, [this]() { return liveWorkers.load() == numWorkers; }); + } + } + + void waitUntilAllFinished() { + DEBUG_THREAD("wait until all workers are finished"); + { + std::unique_lock<std::mutex> lock(mutex); + finishing = true; + if (liveWorkers.load() > 0) { + condition.wait(lock, [this]() { return liveWorkers.load() == 0; }); + } + } + DEBUG_THREAD("joining"); + for (auto& thread : threads) thread->join(); + DEBUG_THREAD("joined"); + } + + void queueFunction(Function* func) { + DEBUG_THREAD("queue function"); + assert(nextFunction < numFunctions); // TODO: if we are given more than we expected, use a slower work queue? + list[nextFunction++].store(func); + availableFuncs++; + } + + void optimizeGlobally() { + PassRunner passRunner(wasm); + passRunner.addDefaultGlobalOptimizationPasses(); + passRunner.run(); + } + + // worker code + + void optimizeFunction(Function* func) { + PassRunner passRunner(wasm); + passRunner.addDefaultFunctionOptimizationPasses(); + passRunner.runFunction(func); + } + + static void workerMain(void* param) { + DEBUG_THREAD("workerMain"); + OptimizingIncrementalModuleBuilder* self = (OptimizingIncrementalModuleBuilder*)param; + { + std::lock_guard<std::mutex> lock(self->mutex); + self->liveWorkers++; + self->activeWorkers++; + self->condition.notify_all(); + } + for (uint32_t i = 0; i < self->numFunctions; i++) { + DEBUG_THREAD("workerMain iteration " << i); + if (self->list[i].load() == self->endMarker) { + // sleep, this entry isn't ready yet + DEBUG_THREAD("workerMain sleep"); + self->activeWorkers--; + { + std::unique_lock<std::mutex> lock(self->mutex); + if (!self->finishing) { // while waiting for the lock, things may have ended + self->condition.wait(lock); + } + } + // continue + DEBUG_THREAD("workerMain continue"); + self->activeWorkers++; + i--; + continue; + } + DEBUG_THREAD("workerMain exchange item"); + auto* func = self->list[i].exchange(nullptr); + if (func == nullptr) { + DEBUG_THREAD("workerMain sees was already taken"); + continue; // someone else has taken this one + } + // we have work to do! + DEBUG_THREAD("workerMain work on " << size_t(func)); + self->availableFuncs--; + self->optimizeFunction(func); + self->finishedFuncs++; + } + DEBUG_THREAD("workerMain ready to exit"); + { + std::lock_guard<std::mutex> lock(self->mutex); + self->liveWorkers--; + self->condition.notify_all(); + } + DEBUG_THREAD("workerMain exiting"); + } +}; + +} // namespace wasm + +#endif // optimizing_incremental_module_builder_h + |