1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
/*
* Copyright 2016 WebAssembly Community Group participants
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef optimizing_incremental_module_builder_h
#define optimizing_incremental_module_builder_h
#include <wasm.h>
#include <support/threads.h>
namespace wasm {
#ifdef BINARYEN_THREAD_DEBUG
static std::mutex debug;
#define DEBUG_THREAD(x) { std::lock_guard<std::mutex> lock(debug); std::cerr << "[OptimizingIncrementalModuleBuilder Threading (thread: " << std::this_thread::get_id() << ")] " << x; std::cerr << '\n'; }
#else
#define DEBUG_THREAD(x)
#endif
//
// OptimizingIncrementalModuleBuilder
//
// Helps build wasm modules efficiently. If you build a module by
// adding function by function, and you want to optimize them, this class
// starts optimizing using worker threads *while you are still adding*.
// It runs function optimization passes at that time, and then at the end
// it runs global module-level optimization passes. The result is a fully
// optimized module, optimized while being generated.
//
// This might also be faster than normal module optimization since it
// runs all passes on each function, then goes on to the next function
// which is better for data locality.
//
// Usage: Create an instance, passing it the module and the total
// number of functions. Then call addFunction as you have
// new functions to add (this also adds it to the module). Finally,
// call finish() when all functions have been added.
//
// This avoids locking by using atomics. We allocate an array of nullptrs
// that represent all the functions, and as we add a function, we place it
// at the next index. Each worker will read from the start to the end,
// and when a non-nullptr is found, the worker optimizes that function and
// nulls it. There is also an end marker that is not nullptr nor the address of
// a valid function, which represents that beyond this point we have not
// yet filled in. In other words,
// * the main thread fills everything with the end marker
// * the main thread transforms a marker entry into a function
// * workers pause when they see the marker
// * workers skip over nullptrs
// * workers transform functions into nullptrs, and optimize them
// * we keep an atomic count of the number of active workers and
// the number of optimized functions.
// * after adding a function, the main thread wakes up workers if
// it calculates there is work for them.
// * a lock is used for going to sleep and waking up.
// Locking should be rare, as optimization is
// generally slower than generation; in the optimal case, we never
// lock beyond the first step, and all further work is lock-free.
//
class OptimizingIncrementalModuleBuilder {
Module* wasm;
uint32_t numFunctions;
Function* endMarker;
std::atomic<Function*>* list;
uint32_t nextFunction; // only used on main thread
uint32_t numWorkers;
std::vector<std::unique_ptr<std::thread>> threads;
std::atomic<uint32_t> liveWorkers, activeWorkers, availableFuncs, finishedFuncs;
std::mutex mutex;
std::condition_variable condition;
bool finishing;
public:
// numFunctions must be equal to the number of functions allocated, or higher. Knowing
// this bounds helps avoid locking.
OptimizingIncrementalModuleBuilder(Module* wasm, Index numFunctions) : wasm(wasm), numFunctions(numFunctions), nextFunction(0), finishing(false) {
// prepare work list
endMarker = new Function();
list = new std::atomic<Function*>[numFunctions];
for (uint32_t i = 0; i < numFunctions; i++) {
list[i].store(endMarker);
}
// create workers
DEBUG_THREAD("creating workers");
numWorkers = ThreadPool::getNumCores();
assert(numWorkers >= 1);
liveWorkers.store(0);
activeWorkers.store(0);
for (uint32_t i = 0; i < numWorkers; i++) { // TODO: one less, and add it at the very end, to not compete with main thread?
createWorker();
}
waitUntilAllReady();
DEBUG_THREAD("workers are ready");
// prepare the rest of the initial state
availableFuncs.store(0);
finishedFuncs.store(0);
}
~OptimizingIncrementalModuleBuilder() {
delete[] list;
delete endMarker;
}
// Add a function to the module, and to be optimized
void addFunction(Function* func) {
wasm->addFunction(func);
queueFunction(func);
// wake workers if needed
auto wake = availableFuncs.load();
for (uint32_t i = 0; i < wake; i++) {
wakeWorker();
}
}
// All functions have been added, block until all are optimized, and then do
// global optimizations. When this returns, the module is ready and optimized.
void finish() {
DEBUG_THREAD("finish()ing");
assert(nextFunction == numFunctions);
wakeAllWorkers();
waitUntilAllFinished();
optimizeGlobally();
// TODO: clear side thread allocators from module allocator, as these threads were transient
}
private:
void createWorker() {
DEBUG_THREAD("create a worker");
threads.emplace_back(std::unique_ptr<std::thread>(new std::thread(workerMain, this)));
}
void wakeWorker() {
DEBUG_THREAD("wake a worker");
std::lock_guard<std::mutex> lock(mutex);
condition.notify_one();
}
void wakeAllWorkers() {
DEBUG_THREAD("wake all workers");
std::lock_guard<std::mutex> lock(mutex);
condition.notify_all();
}
void waitUntilAllReady() {
DEBUG_THREAD("wait until all workers are ready");
std::unique_lock<std::mutex> lock(mutex);
if (liveWorkers.load() < numWorkers) {
condition.wait(lock, [this]() { return liveWorkers.load() == numWorkers; });
}
}
void waitUntilAllFinished() {
DEBUG_THREAD("wait until all workers are finished");
{
std::unique_lock<std::mutex> lock(mutex);
finishing = true;
if (liveWorkers.load() > 0) {
condition.wait(lock, [this]() { return liveWorkers.load() == 0; });
}
}
DEBUG_THREAD("joining");
for (auto& thread : threads) thread->join();
DEBUG_THREAD("joined");
}
void queueFunction(Function* func) {
DEBUG_THREAD("queue function");
assert(nextFunction < numFunctions); // TODO: if we are given more than we expected, use a slower work queue?
list[nextFunction++].store(func);
availableFuncs++;
}
void optimizeGlobally() {
PassRunner passRunner(wasm);
passRunner.addDefaultGlobalOptimizationPasses();
passRunner.run();
}
// worker code
void optimizeFunction(Function* func) {
PassRunner passRunner(wasm);
passRunner.addDefaultFunctionOptimizationPasses();
passRunner.runFunction(func);
}
static void workerMain(void* param) {
DEBUG_THREAD("workerMain");
OptimizingIncrementalModuleBuilder* self = (OptimizingIncrementalModuleBuilder*)param;
{
std::lock_guard<std::mutex> lock(self->mutex);
self->liveWorkers++;
self->activeWorkers++;
self->condition.notify_all();
}
for (uint32_t i = 0; i < self->numFunctions; i++) {
DEBUG_THREAD("workerMain iteration " << i);
if (self->list[i].load() == self->endMarker) {
// sleep, this entry isn't ready yet
DEBUG_THREAD("workerMain sleep");
self->activeWorkers--;
{
std::unique_lock<std::mutex> lock(self->mutex);
if (!self->finishing) { // while waiting for the lock, things may have ended
self->condition.wait(lock);
}
}
// continue
DEBUG_THREAD("workerMain continue");
self->activeWorkers++;
i--;
continue;
}
DEBUG_THREAD("workerMain exchange item");
auto* func = self->list[i].exchange(nullptr);
if (func == nullptr) {
DEBUG_THREAD("workerMain sees was already taken");
continue; // someone else has taken this one
}
// we have work to do!
DEBUG_THREAD("workerMain work on " << size_t(func));
self->availableFuncs--;
self->optimizeFunction(func);
self->finishedFuncs++;
}
DEBUG_THREAD("workerMain ready to exit");
{
std::lock_guard<std::mutex> lock(self->mutex);
self->liveWorkers--;
self->condition.notify_all();
}
DEBUG_THREAD("workerMain exiting");
}
};
} // namespace wasm
#endif // optimizing_incremental_module_builder_h
|