/* * Copyright 2016 WebAssembly Community Group participants * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef optimizing_incremental_module_builder_h #define optimizing_incremental_module_builder_h #include #include namespace wasm { #ifdef BINARYEN_THREAD_DEBUG static std::mutex debug; #define DEBUG_THREAD(x) { std::lock_guard lock(debug); std::cerr << "[OptimizingIncrementalModuleBuilder Threading (thread: " << std::this_thread::get_id() << ")] " << x; std::cerr << '\n'; } #else #define DEBUG_THREAD(x) #endif // // OptimizingIncrementalModuleBuilder // // Helps build wasm modules efficiently. If you build a module by // adding function by function, and you want to optimize them, this class // starts optimizing using worker threads *while you are still adding*. // It runs function optimization passes at that time, and then at the end // it runs global module-level optimization passes. The result is a fully // optimized module, optimized while being generated. // // This might also be faster than normal module optimization since it // runs all passes on each function, then goes on to the next function // which is better for data locality. // // Usage: Create an instance, passing it the module and the total // number of functions. Then call addFunction as you have // new functions to add (this also adds it to the module). Finally, // call finish() when all functions have been added. // // This avoids locking by using atomics. We allocate an array of nullptrs // that represent all the functions, and as we add a function, we place it // at the next index. Each worker will read from the start to the end, // and when a non-nullptr is found, the worker optimizes that function and // nulls it. There is also an end marker that is not nullptr nor the address of // a valid function, which represents that beyond this point we have not // yet filled in. In other words, // * the main thread fills everything with the end marker // * the main thread transforms a marker entry into a function // * workers pause when they see the marker // * workers skip over nullptrs // * workers transform functions into nullptrs, and optimize them // * we keep an atomic count of the number of active workers and // the number of optimized functions. // * after adding a function, the main thread wakes up workers if // it calculates there is work for them. // * a lock is used for going to sleep and waking up. // Locking should be rare, as optimization is // generally slower than generation; in the optimal case, we never // lock beyond the first step, and all further work is lock-free. // class OptimizingIncrementalModuleBuilder { Module* wasm; uint32_t numFunctions; std::function addPrePasses; Function* endMarker; std::atomic* list; uint32_t nextFunction; // only used on main thread uint32_t numWorkers; std::vector> threads; std::atomic liveWorkers, activeWorkers, availableFuncs, finishedFuncs; std::mutex mutex; std::condition_variable condition; bool finishing; public: // numFunctions must be equal to the number of functions allocated, or higher. Knowing // this bounds helps avoid locking. OptimizingIncrementalModuleBuilder(Module* wasm, Index numFunctions, std::function addPrePasses) : wasm(wasm), numFunctions(numFunctions), addPrePasses(addPrePasses), endMarker(nullptr), list(nullptr), nextFunction(0), numWorkers(0), liveWorkers(0), activeWorkers(0), availableFuncs(0), finishedFuncs(0), finishing(false) { if (numFunctions == 0) { // special case: no functions to be optimized. Don't create any threads. return; } // Before parallelism, create all passes on the main thread here, to ensure // prepareToRun() is called for each pass before we start to optimize functions. { PassRunner passRunner(wasm); addPrePasses(passRunner); passRunner.addDefaultFunctionOptimizationPasses(); } // prepare work list endMarker = new Function(); list = new std::atomic[numFunctions]; for (uint32_t i = 0; i < numFunctions; i++) { list[i].store(endMarker); } // create workers DEBUG_THREAD("creating workers"); numWorkers = ThreadPool::getNumCores(); assert(numWorkers >= 1); liveWorkers.store(0); activeWorkers.store(0); for (uint32_t i = 0; i < numWorkers; i++) { // TODO: one less, and add it at the very end, to not compete with main thread? createWorker(); } waitUntilAllReady(); DEBUG_THREAD("workers are ready"); // prepare the rest of the initial state availableFuncs.store(0); finishedFuncs.store(0); } ~OptimizingIncrementalModuleBuilder() { delete[] list; delete endMarker; } // Add a function to the module, and to be optimized void addFunction(Function* func) { wasm->addFunction(func); queueFunction(func); // wake workers if needed auto wake = availableFuncs.load(); for (uint32_t i = 0; i < wake; i++) { wakeWorker(); } } // All functions have been added, block until all are optimized, and then do // global optimizations. When this returns, the module is ready and optimized. void finish() { DEBUG_THREAD("finish()ing"); assert(nextFunction == numFunctions); wakeAllWorkers(); waitUntilAllFinished(); optimizeGlobally(); // TODO: clear side thread allocators from module allocator, as these threads were transient } private: void createWorker() { DEBUG_THREAD("create a worker"); threads.emplace_back(make_unique(workerMain, this)); } void wakeWorker() { DEBUG_THREAD("wake a worker"); std::lock_guard lock(mutex); condition.notify_one(); } void wakeAllWorkers() { DEBUG_THREAD("wake all workers"); std::lock_guard lock(mutex); condition.notify_all(); } void waitUntilAllReady() { DEBUG_THREAD("wait until all workers are ready"); std::unique_lock lock(mutex); if (liveWorkers.load() < numWorkers) { condition.wait(lock, [this]() { return liveWorkers.load() == numWorkers; }); } } void waitUntilAllFinished() { DEBUG_THREAD("wait until all workers are finished"); { std::unique_lock lock(mutex); finishing = true; if (liveWorkers.load() > 0) { condition.wait(lock, [this]() { return liveWorkers.load() == 0; }); } } DEBUG_THREAD("joining"); for (auto& thread : threads) thread->join(); DEBUG_THREAD("joined"); } void queueFunction(Function* func) { DEBUG_THREAD("queue function"); assert(nextFunction < numFunctions); // TODO: if we are given more than we expected, use a slower work queue? list[nextFunction++].store(func); availableFuncs++; } void optimizeGlobally() { PassRunner passRunner(wasm); passRunner.addDefaultGlobalOptimizationPasses(); passRunner.run(); } // worker code void optimizeFunction(Function* func) { PassRunner passRunner(wasm); addPrePasses(passRunner); passRunner.addDefaultFunctionOptimizationPasses(); passRunner.runFunction(func); } static void workerMain(OptimizingIncrementalModuleBuilder* self) { DEBUG_THREAD("workerMain"); { std::lock_guard lock(self->mutex); self->liveWorkers++; self->activeWorkers++; self->condition.notify_all(); } for (uint32_t i = 0; i < self->numFunctions; i++) { DEBUG_THREAD("workerMain iteration " << i); if (self->list[i].load() == self->endMarker) { // sleep, this entry isn't ready yet DEBUG_THREAD("workerMain sleep"); self->activeWorkers--; { std::unique_lock lock(self->mutex); if (!self->finishing) { // while waiting for the lock, things may have ended self->condition.wait(lock); } } // continue DEBUG_THREAD("workerMain continue"); self->activeWorkers++; i--; continue; } DEBUG_THREAD("workerMain exchange item"); auto* func = self->list[i].exchange(nullptr); if (func == nullptr) { DEBUG_THREAD("workerMain sees was already taken"); continue; // someone else has taken this one } // we have work to do! DEBUG_THREAD("workerMain work on " << size_t(func)); self->availableFuncs--; self->optimizeFunction(func); self->finishedFuncs++; } DEBUG_THREAD("workerMain ready to exit"); { std::lock_guard lock(self->mutex); self->liveWorkers--; self->condition.notify_all(); } DEBUG_THREAD("workerMain exiting"); } }; } // namespace wasm #endif // optimizing_incremental_module_builder_h