1// [Blend2D]
2// 2D Vector Graphics Powered by a JIT Compiler.
3//
4// [License]
5// Zlib - See LICENSE.md file in the package.
6
7#include "./blapi-build_p.h"
8#include "./blbitarray_p.h"
9#include "./blruntime_p.h"
10#include "./blsupport_p.h"
11#include "./blthreading_p.h"
12#include "./blthreadpool_p.h"
13
14#ifdef _WIN32
15 #include <process.h>
16#endif
17
18// ============================================================================
19// [Globals]
20// ============================================================================
21
22static BLThreadPoolVirt blThreadPoolVirt;
23
24// ============================================================================
25// [BLThreadPool - Internal]
26// ============================================================================
27
28class BLInternalThreadPool : public BLThreadPool {
29public:
30 enum : uint32_t { kMaxThreadCount = 64 };
31 typedef BLFixedBitArray<BLBitWord, kMaxThreadCount> BitArray;
32
33 volatile size_t refCount;
34 volatile uint32_t stackSize;
35 volatile uint32_t maxThreadCount;
36 volatile uint32_t createdThreadCount;
37 volatile uint32_t pooledThreadCount;
38 volatile uint32_t acquiredThreadCount;
39 volatile uint32_t destroyWaitTimeInMS;
40
41 BLMutex mutex;
42 BLThreadEvent threadsDestroyed;
43 BLThreadAttributes threadAttributes;
44 BitArray pooledThreadBits;
45 BLThread* threads[kMaxThreadCount];
46
47#ifdef _WIN32
48#else
49 pthread_attr_t ptAttr;
50#endif
51
52 // No need to explicitly initialize anything as it should be zero initialized.
53 BLInternalThreadPool() noexcept
54 : BLThreadPool { &blThreadPoolVirt },
55 refCount(1),
56 stackSize(0),
57 maxThreadCount(kMaxThreadCount),
58 createdThreadCount(0),
59 pooledThreadCount(0),
60 acquiredThreadCount(0),
61 destroyWaitTimeInMS(200),
62 mutex(),
63 threadsDestroyed(true, true),
64 threadAttributes {},
65 pooledThreadBits {},
66 threads {} { init(); }
67
68 ~BLInternalThreadPool() noexcept {
69 if (isInitialized()) {
70 uint32_t tries = 3;
71 uint64_t waitTime = (uint64_t(destroyWaitTimeInMS) * 1000u) / tries;
72
73 do {
74 cleanup();
75 if (threadsDestroyed.timedWait(waitTime) == BL_SUCCESS)
76 break;
77 } while (--tries);
78
79 destroy();
80 }
81 }
82
83 BL_INLINE bool isInitialized() noexcept {
84 return threadsDestroyed.isInitialized();
85 }
86
87#ifdef _WIN32
88 BL_INLINE void init() noexcept {}
89 BL_INLINE void destroy() noexcept {}
90#else
91 BL_INLINE void init() noexcept {
92 int err1 = pthread_attr_init(&ptAttr);
93 BL_ASSERT(!err1);
94 BL_UNUSED(err1);
95
96 int err2 = pthread_attr_setdetachstate(&ptAttr, PTHREAD_CREATE_DETACHED);
97 BL_ASSERT(!err2);
98 BL_UNUSED(err2);
99 }
100
101 BL_INLINE void destroy() noexcept {
102 int err = pthread_attr_destroy(&ptAttr);
103 BL_ASSERT(!err);
104 BL_UNUSED(err);
105 }
106#endif
107};
108
109// ============================================================================
110// [BLThreadPool - Create / Destroy]
111// ============================================================================
112
113static void blThreadPoolDestroy(BLInternalThreadPool* self) noexcept {
114 self->~BLInternalThreadPool();
115 free(self);
116}
117
118BLThreadPool* blThreadPoolCreate() noexcept {
119 void* p = malloc(sizeof(BLInternalThreadPool));
120 if (!p)
121 return nullptr;
122
123 BLInternalThreadPool* self = new(p) BLInternalThreadPool();
124 if (!self->isInitialized()) {
125 blThreadPoolDestroy(self);
126 return nullptr;
127 }
128
129 return self;
130}
131
132// ============================================================================
133// [BLThreadPool - AddRef / Release]
134// ============================================================================
135
136static BLThreadPool* BL_CDECL blThreadPoolAddRef(BLThreadPool* self_) noexcept {
137 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
138 if (self->refCount != 0)
139 blAtomicFetchAdd(&self->refCount);
140 return self;
141}
142
143static BLResult BL_CDECL blThreadPoolRelease(BLThreadPool* self_) noexcept {
144 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
145 if (self->refCount != 0 && blAtomicFetchSub(&self->refCount) == 1)
146 blThreadPoolDestroy(self);
147 return BL_SUCCESS;
148}
149
150// ============================================================================
151// [BLThreadPool - Properties]
152// ============================================================================
153
154static uint32_t BL_CDECL blThreadPoolMaxThreadCount(const BLThreadPool* self_) noexcept {
155 const BLInternalThreadPool* self = static_cast<const BLInternalThreadPool*>(self_);
156 return self->maxThreadCount;
157}
158
159static uint32_t BL_CDECL blThreadPoolPooledThreadCount(const BLThreadPool* self_) noexcept {
160 const BLInternalThreadPool* self = static_cast<const BLInternalThreadPool*>(self_);
161 return self->pooledThreadCount;
162}
163
164static BLResult BL_CDECL blThreadPoolSetThreadAttributes(BLThreadPool* self_, const BLThreadAttributes* attributes) noexcept {
165 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
166 BLMutexGuard guard(self->mutex);
167
168 // Verify that the provided `stackSize` is okay.
169 // - POSIX - Minimum stack size is `PTHREAD_STACK_MIN` and some
170 // implementations may enforce alignment to page-size.
171 // - WINDOWS - Minimum stack size is `SYSTEM_INFO::dwAllocationGranularity`,
172 // alignment should follow the granularity as well, however,
173 // WinAPI would align if the argument is mistaligned.
174 uint32_t stackSize = attributes->stackSize;
175 if (stackSize) {
176 const BLRuntimeSystemInfo& si = blRuntimeContext.systemInfo;
177 if (stackSize < si.minThreadStackSize || !blIsAligned(stackSize, si.allocationGranularity))
178 return blTraceError(BL_ERROR_INVALID_VALUE);
179 }
180
181#ifndef _WIN32
182 // On POSIX we try to set the attribute of `pthread_attr_t` as we want to
183 // catch a possible error early instead of dealing with that later during
184 // `pthread_create()`.
185 BL_PROPAGATE(blThreadSetPtAttributes(&self->ptAttr, attributes));
186#endif
187
188 self->threadAttributes = *attributes;
189 return BL_SUCCESS;
190}
191
192// ============================================================================
193// [BLThreadPool - Cleanup]
194// ============================================================================
195
196static void blThreadPoolThreadExitFunc(BLThread* thread, void* data) noexcept {
197 BLInternalThreadPool* threadPool = static_cast<BLInternalThreadPool*>(data);
198 thread->destroy();
199
200 // Signal `threadsDestroyed` event when `createdThreadCount` goes to zero.
201 if (blAtomicFetchSub(&threadPool->createdThreadCount) == 1)
202 threadPool->threadsDestroyed.signal();
203}
204
205static uint32_t BL_CDECL blThreadPoolCleanup(BLThreadPool* self_) noexcept {
206 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
207 BLMutexGuard guard(self->mutex);
208
209 uint32_t n = 0;
210 uint32_t bwIndex = 0;
211 uint32_t pooledThreadCount = self->pooledThreadCount;
212
213 if (!pooledThreadCount)
214 return 0;
215
216 do {
217 BLBitWord mask = self->pooledThreadBits.data[bwIndex];
218 BLBitWordIterator<BLBitWord> it(mask);
219
220 while (it.hasNext()) {
221 uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + it.next();
222 BLThread* thread = self->threads[threadIndex];
223
224 self->threads[threadIndex] = nullptr;
225 thread->quit();
226
227 n++;
228 }
229 self->pooledThreadBits.data[bwIndex] = 0;
230 } while (++bwIndex < BLInternalThreadPool::BitArray::kFixedArraySize);
231
232 self->pooledThreadCount = pooledThreadCount - n;
233 return n;
234}
235
236// ============================================================================
237// [BLThreadPool - Acquire / Release]
238// ============================================================================
239
240static void blThreadPoolReleaseThreadsInternal(BLInternalThreadPool* self, BLThread** threads, uint32_t n) noexcept {
241 uint32_t i = 0;
242 uint32_t bwIndex = 0;
243
244 do {
245 BLBitWord mask = self->pooledThreadBits.data[bwIndex] ^ blBitOnes<BLBitWord>();
246 BLBitWordIterator<BLBitWord> it(mask);
247
248 while (it.hasNext()) {
249 uint32_t bit = it.next();
250 mask ^= blBitMask<BLBitWord>(bit);
251
252 uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + bit;
253 BL_ASSERT(self->threads[threadIndex] == nullptr);
254
255 BLThread* thread = threads[i];
256 self->threads[threadIndex] = thread;
257
258 if (++i >= n)
259 break;
260 }
261
262 self->pooledThreadBits.data[bwIndex] = mask ^ blBitOnes<BLBitWord>();
263 } while (i < n && ++bwIndex < BLInternalThreadPool::BitArray::kFixedArraySize);
264
265 self->pooledThreadCount += n;
266 self->acquiredThreadCount -= n;
267}
268
269static uint32_t blThreadPoolAcquireThreadsInternal(BLInternalThreadPool* self, BLThread** threads, uint32_t n, uint32_t flags) noexcept {
270 uint32_t i = 0;
271
272 uint32_t pooledThreadCount = self->pooledThreadCount;
273 uint32_t acquiredThreadCount = self->acquiredThreadCount;
274
275 if (n > pooledThreadCount) {
276 uint32_t createThreadCount = n - pooledThreadCount;
277 uint32_t remainingThreadCount = self->maxThreadCount - (acquiredThreadCount + pooledThreadCount);
278
279 if (createThreadCount > remainingThreadCount) {
280 // Return if it's not possible to fulfill the `exact` requirement.
281 if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_TRY)
282 return 0;
283
284 if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ALL) {
285 // Acquire / create the number of threads as required.
286 }
287 else if (flags & BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ONE) {
288 // Acquire / create at least one thread, we have to create it if it's not pooled.
289 if (!pooledThreadCount)
290 createThreadCount = 1;
291 }
292 else {
293 // Create maximum number of threads that would not exceed `maxThreadCount`.
294 createThreadCount = remainingThreadCount;
295 }
296 }
297
298 if (blAtomicFetchAdd(&self->createdThreadCount, createThreadCount) == 0)
299 self->threadsDestroyed.reset();
300
301 while (i < createThreadCount) {
302 #ifdef _WIN32
303 BLResult result = blThreadCreate(&threads[i], &self->threadAttributes, blThreadPoolThreadExitFunc, self);
304 #else
305 BLResult result = blThreadCreatePt(&threads[i], &self->ptAttr, blThreadPoolThreadExitFunc, self);
306 #endif
307
308 // Failed to create a thread?
309 if (result != BL_SUCCESS) {
310 uint32_t sub = createThreadCount - i;
311 if (blAtomicFetchSub(&self->createdThreadCount, sub) == sub)
312 self->threadsDestroyed.signal();
313
314 if ((flags & (BL_THREAD_POOL_ACQUIRE_FLAG_TRY | BL_THREAD_POOL_ACQUIRE_FLAG_FORCE_ALL)) == 0)
315 break;
316
317 blThreadPoolReleaseThreadsInternal(self, threads, i);
318 return result;
319 }
320 i++;
321 }
322 }
323
324 uint32_t bwIndex = 0;
325 uint32_t prevI = i;
326
327 while (i < n) {
328 BLBitWord mask = self->pooledThreadBits.data[bwIndex];
329 BLBitWordIterator<BLBitWord> it(mask);
330
331 while (it.hasNext()) {
332 uint32_t bit = it.next();
333 mask ^= blBitMask<BLBitWord>(bit);
334
335 uint32_t threadIndex = bwIndex * blBitSizeOf<BLBitWord>() + bit;
336 BLThread* thread = self->threads[threadIndex];
337
338 BL_ASSERT(thread != nullptr);
339 self->threads[threadIndex] = nullptr;
340
341 threads[i] = thread;
342 if (++i >= n)
343 break;
344 };
345
346 self->pooledThreadBits.data[bwIndex] = mask;
347 if (++bwIndex >= BLInternalThreadPool::BitArray::kFixedArraySize)
348 break;
349 }
350
351 self->pooledThreadCount = pooledThreadCount - (i - prevI);
352 self->acquiredThreadCount = acquiredThreadCount + i;
353 return i;
354}
355
356static uint32_t BL_CDECL blThreadPoolAcquireThreads(BLThreadPool* self_, BLThread** threads, uint32_t n, uint32_t flags) noexcept {
357 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
358 BLMutexGuard guard(self->mutex);
359
360 return blThreadPoolAcquireThreadsInternal(self, threads, n, flags);
361}
362
363static void BL_CDECL blThreadPoolReleaseThreads(BLThreadPool* self_, BLThread** threads, uint32_t n) noexcept {
364 BLInternalThreadPool* self = static_cast<BLInternalThreadPool*>(self_);
365 BLMutexGuard guard(self->mutex);
366
367 return blThreadPoolReleaseThreadsInternal(self, threads, n);
368}
369
370// ============================================================================
371// [BLThreadPool - Global]
372// ============================================================================
373
374static BLWrap<BLInternalThreadPool> blGlobalThreadPool;
375BLThreadPool* blThreadPoolGlobal() noexcept { return &blGlobalThreadPool; }
376
377// ============================================================================
378// [BLThreadPool - RuntimeInit]
379// ============================================================================
380
381static void BL_CDECL blThreadPoolRtShutdown(BLRuntimeContext* rt) noexcept {
382 BL_UNUSED(rt);
383 blGlobalThreadPool.destroy();
384}
385
386static void BL_CDECL blThreadPoolRtCleanup(BLRuntimeContext* rt, uint32_t cleanupFlags) noexcept {
387 BL_UNUSED(rt);
388 if (cleanupFlags & BL_RUNTIME_CLEANUP_THREAD_POOL)
389 blGlobalThreadPool->cleanup();
390}
391
392void blThreadPoolRtInit(BLRuntimeContext* rt) noexcept {
393 // BLThreadPool virtual table.
394 blThreadPoolVirt.addRef = blThreadPoolAddRef;
395 blThreadPoolVirt.release = blThreadPoolRelease;
396 blThreadPoolVirt.maxThreadCount = blThreadPoolMaxThreadCount;
397 blThreadPoolVirt.pooledThreadCount = blThreadPoolPooledThreadCount;
398 blThreadPoolVirt.setThreadAttributes = blThreadPoolSetThreadAttributes;
399 blThreadPoolVirt.cleanup = blThreadPoolCleanup;
400 blThreadPoolVirt.acquireThreads = blThreadPoolAcquireThreads;
401 blThreadPoolVirt.releaseThreads = blThreadPoolReleaseThreads;
402
403 // BLThreadPool built-in global instance.
404 BLThreadAttributes attrs {};
405 attrs.stackSize = rt->systemInfo.minWorkerStackSize;
406
407 blGlobalThreadPool.init();
408 blGlobalThreadPool->setThreadAttributes(attrs);
409
410 rt->shutdownHandlers.add(blThreadPoolRtShutdown);
411 rt->cleanupHandlers.add(blThreadPoolRtCleanup);
412}
413
414// ============================================================================
415// [BLThreadPool - Unit Tests]
416// ============================================================================
417
418#if defined(BL_TEST)
419struct ThreadTestData {
420 uint32_t iter;
421 volatile uint32_t counter;
422 BLThreadEvent event;
423
424 ThreadTestData() noexcept
425 : counter(0),
426 event(false, false) {}
427};
428
429static void BL_CDECL test_thread_entry(BLThread* thread, void* data_) noexcept {
430 ThreadTestData* data = static_cast<ThreadTestData*>(data_);
431 INFO("[#%u] Thread %p running\n", data->iter, thread);
432}
433
434static void BL_CDECL test_thread_done(BLThread* thread, void* data_) noexcept {
435 ThreadTestData* data = static_cast<ThreadTestData*>(data_);
436 INFO("[#%u] Thread %p done\n", data->iter, thread);
437
438 if (blAtomicFetchSub(&data->counter) == 1)
439 data->event.signal();
440}
441
442UNIT(blend2d_thread_pool) {
443 BLThreadPool* tp = blThreadPoolGlobal();
444 ThreadTestData data;
445
446 constexpr uint32_t kThreadCount = 4;
447 BLThread* threads[kThreadCount];
448
449 // Try to allocate 10000 threads - must fail as it's over all limits.
450 INFO("Trying to allocate very high number of threads that should fail");
451 uint32_t n = tp->acquireThreads(nullptr, 10000, BL_THREAD_POOL_ACQUIRE_FLAG_TRY);
452 EXPECT(n == 0);
453
454 INFO("Repeatedly acquiring / releasing %u threads with a simple task", kThreadCount);
455 for (uint32_t i = 0; i < 10; i++) {
456 data.iter = i;
457
458 INFO("[#%u] Acquiring %u threads from thread-pool", i, kThreadCount);
459 uint32_t acquiredCount = tp->acquireThreads(threads, kThreadCount);
460 EXPECT(acquiredCount == kThreadCount);
461
462 blAtomicStore(&data.counter, kThreadCount);
463 INFO("[#%u] Running %u threads", i, kThreadCount);
464 for (BLThread* thread : threads) {
465 BLResult result = thread->run(test_thread_entry, test_thread_done, &data);
466 EXPECT(result == BL_SUCCESS);
467 }
468
469 INFO("[#%u] Waiting and releasing", i);
470 data.event.wait();
471 tp->releaseThreads(threads, kThreadCount);
472 }
473
474
475 INFO("Cleaning up");
476 tp->cleanup();
477
478 INFO("Done");
479}
480#endif
481