1 | // [Blend2D] |
2 | // 2D Vector Graphics Powered by a JIT Compiler. |
3 | // |
4 | // [License] |
5 | // Zlib - See LICENSE.md file in the package. |
6 | |
7 | #include "./blapi-build_p.h" |
8 | #include "./blbitarray_p.h" |
9 | #include "./blruntime_p.h" |
10 | #include "./blsupport_p.h" |
11 | #include "./blthreading_p.h" |
12 | #include "./blthreadpool_p.h" |
13 | |
14 | #ifdef _WIN32 |
15 | #include <process.h> |
16 | #endif |
17 | |
18 | // ============================================================================ |
19 | // [Globals] |
20 | // ============================================================================ |
21 | |
22 | static BLThreadPoolVirt blThreadPoolVirt; |
23 | |
24 | // ============================================================================ |
25 | // [BLThreadPool - Internal] |
26 | // ============================================================================ |
27 | |
28 | class BLInternalThreadPool : public BLThreadPool { |
29 | public: |
30 | enum : uint32_t { kMaxThreadCount = 64 }; |
31 | typedef BLFixedBitArray<BLBitWord, kMaxThreadCount> BitArray; |
32 | |
33 | volatile size_t refCount; |
34 | volatile uint32_t stackSize; |
35 | volatile uint32_t maxThreadCount; |
36 | volatile uint32_t createdThreadCount; |
37 | volatile uint32_t pooledThreadCount; |
38 | volatile uint32_t acquiredThreadCount; |
39 | volatile uint32_t destroyWaitTimeInMS; |
40 | |
41 | BLMutex mutex; |
42 | BLThreadEvent threadsDestroyed; |
43 | BLThreadAttributes threadAttributes; |
44 | BitArray pooledThreadBits; |
45 | BLThread* threads[kMaxThreadCount]; |
46 | |
47 | #ifdef _WIN32 |
48 | #else |
49 | pthread_attr_t ptAttr; |
50 | #endif |
51 | |
52 | // No need to explicitly initialize anything as it should be zero initialized. |
53 | BLInternalThreadPool() noexcept |
54 | : BLThreadPool { &blThreadPoolVirt }, |
55 | refCount(1), |
56 | stackSize(0), |
57 | maxThreadCount(kMaxThreadCount), |
58 | createdThreadCount(0), |
59 | pooledThreadCount(0), |
60 | acquiredThreadCount(0), |
61 | destroyWaitTimeInMS(200), |
62 | mutex(), |
63 | threadsDestroyed(true, true), |
64 | threadAttributes {}, |
65 | pooledThreadBits {}, |
66 | threads {} { init(); } |
67 | |
68 | ~BLInternalThreadPool() noexcept { |
69 | if (isInitialized()) { |
70 | uint32_t tries = 3; |
71 | uint64_t waitTime = (uint64_t(destroyWaitTimeInMS) * 1000u) / tries; |
72 | |
73 | do { |
74 | cleanup(); |
75 | if (threadsDestroyed.timedWait(waitTime) == BL_SUCCESS) |
76 | break; |
77 | } while (--tries); |
78 | |
79 | destroy(); |
80 | } |
81 | } |
82 | |
83 | BL_INLINE bool isInitialized() noexcept { |
84 | return threadsDestroyed.isInitialized(); |
85 | } |
86 | |
87 | #ifdef _WIN32 |
88 | BL_INLINE void init() noexcept {} |
89 | BL_INLINE void destroy() noexcept {} |
90 | #else |
91 | BL_INLINE void init() noexcept { |
92 | int err1 = pthread_attr_init(&ptAttr); |
93 | BL_ASSERT(!err1); |
94 | BL_UNUSED(err1); |
95 | |
96 | int err2 = pthread_attr_setdetachstate(&ptAttr, PTHREAD_CREATE_DETACHED); |
97 | BL_ASSERT(!err2); |
98 | BL_UNUSED(err2); |
99 | } |
100 | |
101 | BL_INLINE void destroy() noexcept { |
102 | int err = pthread_attr_destroy(&ptAttr); |
103 | BL_ASSERT(!err); |
104 | BL_UNUSED(err); |
105 | } |
106 | #endif |
107 | }; |
108 | |
109 | // ============================================================================ |
110 | // [BLThreadPool - Create / Destroy] |
111 | // ============================================================================ |
112 | |
113 | static void blThreadPoolDestroy(BLInternalThreadPool* self) noexcept { |
114 | self->~BLInternalThreadPool(); |
115 | free(self); |
116 | } |
117 | |
118 | BLThreadPool* blThreadPoolCreate() noexcept { |
119 | void* p = malloc(sizeof(BLInternalThreadPool)); |
120 | if (!p) |
121 | return nullptr; |
122 | |
123 | BLInternalThreadPool* self = new(p) BLInternalThreadPool(); |
124 | if (!self->isInitialized()) { |
125 | blThreadPoolDestroy(self); |
126 | return nullptr; |
127 | } |
128 | |
129 | return self; |
130 | } |
131 | |
132 | // ============================================================================ |
133 | // [BLThreadPool - AddRef / Release] |
134 | // ============================================================================ |
135 | |
136 | static BLThreadPool* BL_CDECL blThreadPoolAddRef(BLThreadPool* self_) noexcept { |
137 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
138 | if (self->refCount != 0) |
139 | blAtomicFetchAdd(&self->refCount); |
140 | return self; |
141 | } |
142 | |
143 | static BLResult BL_CDECL blThreadPoolRelease(BLThreadPool* self_) noexcept { |
144 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
145 | if (self->refCount != 0 && blAtomicFetchSub(&self->refCount) == 1) |
146 | blThreadPoolDestroy(self); |
147 | return BL_SUCCESS; |
148 | } |
149 | |
150 | // ============================================================================ |
151 | // [BLThreadPool - Properties] |
152 | // ============================================================================ |
153 | |
154 | static uint32_t BL_CDECL blThreadPoolMaxThreadCount(const BLThreadPool* self_) noexcept { |
155 | const BLInternalThreadPool* self = static_cast<const BLInternalThreadPool*>(self_); |
156 | return self->maxThreadCount; |
157 | } |
158 | |
159 | static uint32_t BL_CDECL blThreadPoolPooledThreadCount(const BLThreadPool* self_) noexcept { |
160 | const BLInternalThreadPool* self = static_cast<const BLInternalThreadPool*>(self_); |
161 | return self->pooledThreadCount; |
162 | } |
163 | |
164 | static BLResult BL_CDECL blThreadPoolSetThreadAttributes(BLThreadPool* self_, const BLThreadAttributes* attributes) noexcept { |
165 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
166 | BLMutexGuard guard(self->mutex); |
167 | |
168 | // Verify that the provided `stackSize` is okay. |
169 | // - POSIX - Minimum stack size is `PTHREAD_STACK_MIN` and some |
170 | // implementations may enforce alignment to page-size. |
171 | // - WINDOWS - Minimum stack size is `SYSTEM_INFO::dwAllocationGranularity`, |
172 | // alignment should follow the granularity as well, however, |
173 | // WinAPI would align if the argument is mistaligned. |
174 | uint32_t stackSize = attributes->stackSize; |
175 | if (stackSize) { |
176 | const BLRuntimeSystemInfo& si = blRuntimeContext.systemInfo; |
177 | if (stackSize < si.minThreadStackSize || !blIsAligned(stackSize, si.allocationGranularity)) |
178 | return blTraceError(BL_ERROR_INVALID_VALUE); |
179 | } |
180 | |
181 | #ifndef _WIN32 |
182 | // On POSIX we try to set the attribute of `pthread_attr_t` as we want to |
183 | // catch a possible error early instead of dealing with that later during |
184 | // `pthread_create()`. |
185 | BL_PROPAGATE(blThreadSetPtAttributes(&self->ptAttr, attributes)); |
186 | #endif |
187 | |
188 | self->threadAttributes = *attributes; |
189 | return BL_SUCCESS; |
190 | } |
191 | |
192 | // ============================================================================ |
193 | // [BLThreadPool - Cleanup] |
194 | // ============================================================================ |
195 | |
196 | static void blThreadPoolThreadExitFunc(BLThread* thread, void* data) noexcept { |
197 | BLInternalThreadPool* threadPool = static_cast<BLInternalThreadPool*>(data); |
198 | thread->destroy(); |
199 | |
200 | // Signal `threadsDestroyed` event when `createdThreadCount` goes to zero. |
201 | if (blAtomicFetchSub(&threadPool->createdThreadCount) == 1) |
202 | threadPool->threadsDestroyed.signal(); |
203 | } |
204 | |
205 | static uint32_t BL_CDECL blThreadPoolCleanup(BLThreadPool* self_) noexcept { |
206 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
207 | BLMutexGuard guard(self->mutex); |
208 | |
209 | uint32_t n = 0; |
210 | uint32_t bwIndex = 0; |
211 | uint32_t pooledThreadCount = self->pooledThreadCount; |
212 | |
213 | if (!pooledThreadCount) |
214 | return 0; |
215 | |
216 | do { |
217 | BLBitWord mask = self->pooledThreadBits.data[bwIndex]; |
218 | BLBitWordIterator<BLBitWord> it(mask); |
219 | |
220 | while (it.hasNext()) { |
221 | uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + it.next(); |
222 | BLThread* thread = self->threads[threadIndex]; |
223 | |
224 | self->threads[threadIndex] = nullptr; |
225 | thread->quit(); |
226 | |
227 | n++; |
228 | } |
229 | self->pooledThreadBits.data[bwIndex] = 0; |
230 | } while (++bwIndex < BLInternalThreadPool::BitArray::kFixedArraySize); |
231 | |
232 | self->pooledThreadCount = pooledThreadCount - n; |
233 | return n; |
234 | } |
235 | |
236 | // ============================================================================ |
237 | // [BLThreadPool - Acquire / Release] |
238 | // ============================================================================ |
239 | |
240 | static void blThreadPoolReleaseThreadsInternal(BLInternalThreadPool* self, BLThread** threads, uint32_t n) noexcept { |
241 | uint32_t i = 0; |
242 | uint32_t bwIndex = 0; |
243 | |
244 | do { |
245 | BLBitWord mask = self->pooledThreadBits.data[bwIndex] ^ blBitOnes<BLBitWord>(); |
246 | BLBitWordIterator<BLBitWord> it(mask); |
247 | |
248 | while (it.hasNext()) { |
249 | uint32_t bit = it.next(); |
250 | mask ^= blBitMask<BLBitWord>(bit); |
251 | |
252 | uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + bit; |
253 | BL_ASSERT(self->threads[threadIndex] == nullptr); |
254 | |
255 | BLThread* thread = threads[i]; |
256 | self->threads[threadIndex] = thread; |
257 | |
258 | if (++i >= n) |
259 | break; |
260 | } |
261 | |
262 | self->pooledThreadBits.data[bwIndex] = mask ^ blBitOnes<BLBitWord>(); |
263 | } while (i < n && ++bwIndex < BLInternalThreadPool::BitArray::kFixedArraySize); |
264 | |
265 | self->pooledThreadCount += n; |
266 | self->acquiredThreadCount -= n; |
267 | } |
268 | |
269 | static uint32_t blThreadPoolAcquireThreadsInternal(BLInternalThreadPool* self, BLThread** threads, uint32_t n, uint32_t flags) noexcept { |
270 | uint32_t i = 0; |
271 | |
272 | uint32_t pooledThreadCount = self->pooledThreadCount; |
273 | uint32_t acquiredThreadCount = self->acquiredThreadCount; |
274 | |
275 | if (n > pooledThreadCount) { |
276 | uint32_t createThreadCount = n - pooledThreadCount; |
277 | uint32_t remainingThreadCount = self->maxThreadCount - (acquiredThreadCount + pooledThreadCount); |
278 | |
279 | if (createThreadCount > remainingThreadCount) { |
280 | // Return if it's not possible to fulfill the `exact` requirement. |
281 | if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_TRY) |
282 | return 0; |
283 | |
284 | if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ALL) { |
285 | // Acquire / create the number of threads as required. |
286 | } |
287 | else if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ONE) { |
288 | // Acquire / create at least one thread, we have to create it if it's not pooled. |
289 | if (!pooledThreadCount) |
290 | createThreadCount = 1; |
291 | } |
292 | else { |
293 | // Create maximum number of threads that would not exceed `maxThreadCount`. |
294 | createThreadCount = remainingThreadCount; |
295 | } |
296 | } |
297 | |
298 | if (blAtomicFetchAdd(&self->createdThreadCount, createThreadCount) == 0) |
299 | self->threadsDestroyed.reset(); |
300 | |
301 | while (i < createThreadCount) { |
302 | #ifdef _WIN32 |
303 | BLResult result = blThreadCreate(&threads[i], &self->threadAttributes, blThreadPoolThreadExitFunc, self); |
304 | #else |
305 | BLResult result = blThreadCreatePt(&threads[i], &self->ptAttr, blThreadPoolThreadExitFunc, self); |
306 | #endif |
307 | |
308 | // Failed to create a thread? |
309 | if (result != BL_SUCCESS) { |
310 | uint32_t sub = createThreadCount - i; |
311 | if (blAtomicFetchSub(&self->createdThreadCount, sub) == sub) |
312 | self->threadsDestroyed.signal(); |
313 | |
314 | if ((flags & (BL_THREAD_POOL_ACQUIRE_FLAG_TRY | BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ALL)) == 0) |
315 | break; |
316 | |
317 | blThreadPoolReleaseThreadsInternal(self, threads, i); |
318 | return result; |
319 | } |
320 | i++; |
321 | } |
322 | } |
323 | |
324 | uint32_t bwIndex = 0; |
325 | uint32_t prevI = i; |
326 | |
327 | while (i < n) { |
328 | BLBitWord mask = self->pooledThreadBits.data[bwIndex]; |
329 | BLBitWordIterator<BLBitWord> it(mask); |
330 | |
331 | while (it.hasNext()) { |
332 | uint32_t bit = it.next(); |
333 | mask ^= blBitMask<BLBitWord>(bit); |
334 | |
335 | uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + bit; |
336 | BLThread* thread = self->threads[threadIndex]; |
337 | |
338 | BL_ASSERT(thread != nullptr); |
339 | self->threads[threadIndex] = nullptr; |
340 | |
341 | threads[i] = thread; |
342 | if (++i >= n) |
343 | break; |
344 | }; |
345 | |
346 | self->pooledThreadBits.data[bwIndex] = mask; |
347 | if (++bwIndex >= BLInternalThreadPool::BitArray::kFixedArraySize) |
348 | break; |
349 | } |
350 | |
351 | self->pooledThreadCount = pooledThreadCount - (i - prevI); |
352 | self->acquiredThreadCount = acquiredThreadCount + i; |
353 | return i; |
354 | } |
355 | |
356 | static uint32_t BL_CDECL blThreadPoolAcquireThreads(BLThreadPool* self_, BLThread** threads, uint32_t n, uint32_t flags) noexcept { |
357 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
358 | BLMutexGuard guard(self->mutex); |
359 | |
360 | return blThreadPoolAcquireThreadsInternal(self, threads, n, flags); |
361 | } |
362 | |
363 | static void BL_CDECL blThreadPoolReleaseThreads(BLThreadPool* self_, BLThread** threads, uint32_t n) noexcept { |
364 | BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_); |
365 | BLMutexGuard guard(self->mutex); |
366 | |
367 | return blThreadPoolReleaseThreadsInternal(self, threads, n); |
368 | } |
369 | |
370 | // ============================================================================ |
371 | // [BLThreadPool - Global] |
372 | // ============================================================================ |
373 | |
374 | static BLWrap<BLInternalThreadPool> blGlobalThreadPool; |
375 | BLThreadPool* blThreadPoolGlobal() noexcept { return &blGlobalThreadPool; } |
376 | |
377 | // ============================================================================ |
378 | // [BLThreadPool - RuntimeInit] |
379 | // ============================================================================ |
380 | |
381 | static void BL_CDECL blThreadPoolRtShutdown(BLRuntimeContext* rt) noexcept { |
382 | BL_UNUSED(rt); |
383 | blGlobalThreadPool.destroy(); |
384 | } |
385 | |
386 | static void BL_CDECL blThreadPoolRtCleanup(BLRuntimeContext* rt, uint32_t cleanupFlags) noexcept { |
387 | BL_UNUSED(rt); |
388 | if (cleanupFlags & BL_RUNTIME_CLEANUP_THREAD_POOL) |
389 | blGlobalThreadPool->cleanup(); |
390 | } |
391 | |
392 | void blThreadPoolRtInit(BLRuntimeContext* rt) noexcept { |
393 | // BLThreadPool virtual table. |
394 | blThreadPoolVirt.addRef = blThreadPoolAddRef; |
395 | blThreadPoolVirt.release = blThreadPoolRelease; |
396 | blThreadPoolVirt.maxThreadCount = blThreadPoolMaxThreadCount; |
397 | blThreadPoolVirt.pooledThreadCount = blThreadPoolPooledThreadCount; |
398 | blThreadPoolVirt.setThreadAttributes = blThreadPoolSetThreadAttributes; |
399 | blThreadPoolVirt.cleanup = blThreadPoolCleanup; |
400 | blThreadPoolVirt.acquireThreads = blThreadPoolAcquireThreads; |
401 | blThreadPoolVirt.releaseThreads = blThreadPoolReleaseThreads; |
402 | |
403 | // BLThreadPool built-in global instance. |
404 | BLThreadAttributes attrs {}; |
405 | attrs.stackSize = rt->systemInfo.minWorkerStackSize; |
406 | |
407 | blGlobalThreadPool.init(); |
408 | blGlobalThreadPool->setThreadAttributes(attrs); |
409 | |
410 | rt->shutdownHandlers.add(blThreadPoolRtShutdown); |
411 | rt->cleanupHandlers.add(blThreadPoolRtCleanup); |
412 | } |
413 | |
414 | // ============================================================================ |
415 | // [BLThreadPool - Unit Tests] |
416 | // ============================================================================ |
417 | |
418 | #if defined(BL_TEST) |
419 | struct ThreadTestData { |
420 | uint32_t iter; |
421 | volatile uint32_t counter; |
422 | BLThreadEvent event; |
423 | |
424 | ThreadTestData() noexcept |
425 | : counter(0), |
426 | event(false, false) {} |
427 | }; |
428 | |
429 | static void BL_CDECL test_thread_entry(BLThread* thread, void* data_) noexcept { |
430 | ThreadTestData* data = static_cast<ThreadTestData*>(data_); |
431 | INFO("[#%u] Thread %p running\n" , data->iter, thread); |
432 | } |
433 | |
434 | static void BL_CDECL test_thread_done(BLThread* thread, void* data_) noexcept { |
435 | ThreadTestData* data = static_cast<ThreadTestData*>(data_); |
436 | INFO("[#%u] Thread %p done\n" , data->iter, thread); |
437 | |
438 | if (blAtomicFetchSub(&data->counter) == 1) |
439 | data->event.signal(); |
440 | } |
441 | |
442 | UNIT(blend2d_thread_pool) { |
443 | BLThreadPool* tp = blThreadPoolGlobal(); |
444 | ThreadTestData data; |
445 | |
446 | constexpr uint32_t kThreadCount = 4; |
447 | BLThread* threads[kThreadCount]; |
448 | |
449 | // Try to allocate 10000 threads - must fail as it's over all limits. |
450 | INFO("Trying to allocate very high number of threads that should fail" ); |
451 | uint32_t n = tp->acquireThreads(nullptr, 10000, BL_THREAD_POOL_ACQUIRE_FLAG_TRY); |
452 | EXPECT(n == 0); |
453 | |
454 | INFO("Repeatedly acquiring / releasing %u threads with a simple task" , kThreadCount); |
455 | for (uint32_t i = 0; i < 10; i++) { |
456 | data.iter = i; |
457 | |
458 | INFO("[#%u] Acquiring %u threads from thread-pool" , i, kThreadCount); |
459 | uint32_t acquiredCount = tp->acquireThreads(threads, kThreadCount); |
460 | EXPECT(acquiredCount == kThreadCount); |
461 | |
462 | blAtomicStore(&data.counter, kThreadCount); |
463 | INFO("[#%u] Running %u threads" , i, kThreadCount); |
464 | for (BLThread* thread : threads) { |
465 | BLResult result = thread->run(test_thread_entry, test_thread_done, &data); |
466 | EXPECT(result == BL_SUCCESS); |
467 | } |
468 | |
469 | INFO("[#%u] Waiting and releasing" , i); |
470 | data.event.wait(); |
471 | tp->releaseThreads(threads, kThreadCount); |
472 | } |
473 | |
474 | |
475 | INFO("Cleaning up" ); |
476 | tp->cleanup(); |
477 | |
478 | INFO("Done" ); |
479 | } |
480 | #endif |
481 | |