1// Copyright 2009-2021 Intel Corporation
2// SPDX-License-Identifier: Apache-2.0
3
4#include "taskschedulerinternal.h"
5#include "../math/math.h"
6#include "../sys/sysinfo.h"
7#include <algorithm>
8
9namespace embree
10{
11 RTC_NAMESPACE_BEGIN
12
13 static MutexSys g_mutex;
14 size_t TaskScheduler::g_numThreads = 0;
15 __thread TaskScheduler* TaskScheduler::g_instance = nullptr;
16 std::vector<Ref<TaskScheduler>> g_instance_vector;
17 __thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr;
18 TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr;
19
20 template<typename Predicate, typename Body>
21 __forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body)
22 {
23 while (true)
24 {
25 /*! some rounds that yield */
26 for (size_t i=0; i<32; i++)
27 {
28 /*! some spinning rounds */
29 const size_t threadCount = thread.threadCount();
30 for (size_t j=0; j<1024; j+=threadCount)
31 {
32 if (!pred()) return;
33 if (thread.scheduler->steal_from_other_threads(thread)) {
34 i=j=0;
35 body();
36 }
37 }
38 yield();
39 }
40 }
41 }
42
43 /*! run this task */
44 void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible
45 {
46 /* try to run if not already stolen */
47 if (try_switch_state(INITIALIZED,DONE))
48 {
49 Task* prevTask = thread.task;
50 thread.task = this;
51 // -- GODOT start --
52 // try {
53 // if (thread.scheduler->cancellingException == nullptr)
54 closure->execute();
55 // } catch (...) {
56 // if (thread.scheduler->cancellingException == nullptr)
57 // thread.scheduler->cancellingException = std::current_exception();
58 // }
59 // -- GODOT end --
60 thread.task = prevTask;
61 add_dependencies(-1);
62 }
63
64 /* steal until all dependencies have completed */
65 steal_loop(thread,
66 [&] () { return dependencies>0; },
67 [&] () { while (thread.tasks.execute_local_internal(thread,this)); });
68
69 /* now signal our parent task that we are finished */
70 if (parent)
71 parent->add_dependencies(-1);
72 }
73
74 /*! run this task */
75 dll_export void TaskScheduler::Task::run (Thread& thread) {
76 run_internal(thread);
77 }
78
79 bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent)
80 {
81 /* stop if we run out of local tasks or reach the waiting task */
82 if (right == 0 || &tasks[right-1] == parent)
83 return false;
84
85 /* execute task */
86 size_t oldRight = right;
87 tasks[right-1].run_internal(thread);
88 if (right != oldRight) {
89 THROW_RUNTIME_ERROR("you have to wait for spawned subtasks");
90 }
91
92 /* pop task and closure from stack */
93 right--;
94 if (tasks[right].stackPtr != size_t(-1))
95 stackPtr = tasks[right].stackPtr;
96
97 /* also move left pointer */
98 if (left >= right) left.store(right.load());
99
100 return right != 0;
101 }
102
103 dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) {
104 return execute_local_internal(thread,parent);
105 }
106
107 bool TaskScheduler::TaskQueue::steal(Thread& thread)
108 {
109 size_t l = left;
110 size_t r = right;
111 if (l < r)
112 {
113 l = left++;
114 if (l >= r)
115 return false;
116 }
117 else
118 return false;
119
120 if (!tasks[l].try_steal(thread.tasks.tasks[thread.tasks.right]))
121 return false;
122
123 thread.tasks.right++;
124 return true;
125 }
126
127 /* we steal from the left */
128 size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft()
129 {
130 if (left >= right) return 0;
131 return tasks[left].N;
132 }
133
134 void threadPoolFunction(std::pair<TaskScheduler::ThreadPool*,size_t>* pair)
135 {
136 TaskScheduler::ThreadPool* pool = pair->first;
137 size_t threadIndex = pair->second;
138 delete pair;
139 pool->thread_loop(threadIndex);
140 }
141
142 TaskScheduler::ThreadPool::ThreadPool(bool set_affinity)
143 : numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {}
144
145 dll_export void TaskScheduler::ThreadPool::startThreads()
146 {
147 if (running) return;
148 setNumThreads(numThreads,true);
149 }
150
151 void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads)
152 {
153 Lock<MutexSys> lock(g_mutex);
154 assert(newNumThreads);
155 newNumThreads = min(newNumThreads, (size_t) getNumberOfLogicalThreads());
156
157 numThreads = newNumThreads;
158 if (!startThreads && !running) return;
159 running = true;
160 size_t numThreadsActive = numThreadsRunning;
161
162 mutex.lock();
163 numThreadsRunning = newNumThreads;
164 mutex.unlock();
165 condition.notify_all();
166
167 /* start new threads */
168 for (size_t t=numThreadsActive; t<numThreads; t++)
169 {
170 if (t == 0) continue;
171 auto pair = new std::pair<TaskScheduler::ThreadPool*,size_t>(this,t);
172 threads.push_back(createThread((thread_func)threadPoolFunction,pair,4*1024*1024,set_affinity ? t : -1));
173 }
174
175 /* stop some threads if we reduce the number of threads */
176 for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) {
177 if (t == 0) continue;
178 embree::join(threads.back());
179 threads.pop_back();
180 }
181 }
182
183 TaskScheduler::ThreadPool::~ThreadPool()
184 {
185 /* leave all taskschedulers */
186 mutex.lock();
187 numThreadsRunning = 0;
188 mutex.unlock();
189 condition.notify_all();
190
191 /* wait for threads to terminate */
192 for (size_t i=0; i<threads.size(); i++)
193 embree::join(threads[i]);
194 }
195
196 dll_export void TaskScheduler::ThreadPool::add(const Ref<TaskScheduler>& scheduler)
197 {
198 mutex.lock();
199 schedulers.push_back(scheduler);
200 mutex.unlock();
201 condition.notify_all();
202 }
203
204 dll_export void TaskScheduler::ThreadPool::remove(const Ref<TaskScheduler>& scheduler)
205 {
206 Lock<MutexSys> lock(mutex);
207 for (std::list<Ref<TaskScheduler> >::iterator it = schedulers.begin(); it != schedulers.end(); it++) {
208 if (scheduler == *it) {
209 schedulers.erase(it);
210 return;
211 }
212 }
213 }
214
215 void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex)
216 {
217 while (globalThreadIndex < numThreadsRunning)
218 {
219 Ref<TaskScheduler> scheduler = NULL;
220 ssize_t threadIndex = -1;
221 {
222 Lock<MutexSys> lock(mutex);
223 condition.wait(mutex, [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); });
224 if (globalThreadIndex >= numThreadsRunning) break;
225 scheduler = schedulers.front();
226 threadIndex = scheduler->allocThreadIndex();
227 }
228 scheduler->thread_loop(threadIndex);
229 }
230 }
231
232 TaskScheduler::TaskScheduler()
233 : threadCounter(0), anyTasksRunning(0), hasRootTask(false)
234 {
235 threadLocal.resize(2*getNumberOfLogicalThreads()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x.
236 for (size_t i=0; i<threadLocal.size(); i++)
237 threadLocal[i].store(nullptr);
238 }
239
240 TaskScheduler::~TaskScheduler()
241 {
242 assert(threadCounter == 0);
243 }
244
245 dll_export size_t TaskScheduler::threadID()
246 {
247 Thread* thread = TaskScheduler::thread();
248 if (thread) return thread->threadIndex;
249 else return 0;
250 }
251
252 dll_export size_t TaskScheduler::threadIndex()
253 {
254 Thread* thread = TaskScheduler::thread();
255 if (thread) return thread->threadIndex;
256 else return 0;
257 }
258
259 dll_export size_t TaskScheduler::threadCount() {
260 return threadPool->size();
261 }
262
263 dll_export TaskScheduler* TaskScheduler::instance()
264 {
265 if (g_instance == NULL) {
266 Lock<MutexSys> lock(g_mutex);
267 g_instance = new TaskScheduler;
268 g_instance_vector.push_back(g_instance);
269 }
270 return g_instance;
271 }
272
273 void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads)
274 {
275 if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity);
276 threadPool->setNumThreads(numThreads,start_threads);
277 }
278
279 void TaskScheduler::destroy() {
280 delete threadPool; threadPool = nullptr;
281 }
282
283 dll_export ssize_t TaskScheduler::allocThreadIndex()
284 {
285 size_t threadIndex = threadCounter++;
286 assert(threadIndex < threadLocal.size());
287 return threadIndex;
288 }
289
290 void TaskScheduler::join()
291 {
292 mutex.lock();
293 size_t threadIndex = allocThreadIndex();
294 condition.wait(mutex, [&] () { return hasRootTask.load(); });
295 mutex.unlock();
296 // -- GODOT start --
297 // std::exception_ptr except = thread_loop(threadIndex);
298 // if (except != nullptr) std::rethrow_exception(except);
299 thread_loop(threadIndex);
300 // -- GODOT end --
301 }
302
303 void TaskScheduler::reset() {
304 hasRootTask = false;
305 }
306
307 void TaskScheduler::wait_for_threads(size_t threadCount)
308 {
309 while (threadCounter < threadCount-1)
310 pause_cpu();
311 }
312
313 dll_export TaskScheduler::Thread* TaskScheduler::thread() {
314 return thread_local_thread;
315 }
316
317 dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread)
318 {
319 Thread* old = thread_local_thread;
320 thread_local_thread = thread;
321 return old;
322 }
323
324 dll_export bool TaskScheduler::wait()
325 {
326 Thread* thread = TaskScheduler::thread();
327 if (thread == nullptr) return true;
328 while (thread->tasks.execute_local_internal(*thread,thread->task)) {};
329 return thread->scheduler->cancellingException == nullptr;
330 }
331
332// -- GODOT start --
333// std::exception_ptr TaskScheduler::thread_loop(size_t threadIndex)
334 void TaskScheduler::thread_loop(size_t threadIndex)
335// -- GODOT end --
336 {
337 /* allocate thread structure */
338 std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
339 Thread& thread = *mthread;
340 threadLocal[threadIndex].store(&thread);
341 Thread* oldThread = swapThread(&thread);
342
343 /* main thread loop */
344 while (anyTasksRunning)
345 {
346 steal_loop(thread,
347 [&] () { return anyTasksRunning > 0; },
348 [&] () {
349 anyTasksRunning++;
350 while (thread.tasks.execute_local_internal(thread,nullptr));
351 anyTasksRunning--;
352 });
353 }
354 threadLocal[threadIndex].store(nullptr);
355 swapThread(oldThread);
356
357 /* remember exception to throw */
358 // -- GODOT start --
359 // std::exception_ptr except = nullptr;
360 // if (cancellingException != nullptr) except = cancellingException;
361 // -- GODOT end --
362 /* wait for all threads to terminate */
363 threadCounter--;
364#if defined(__WIN32__)
365 size_t loopIndex = 1;
366#endif
367#define LOOP_YIELD_THRESHOLD (4096)
368 while (threadCounter > 0) {
369#if defined(__WIN32__)
370 if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0)
371 yield();
372 else
373 _mm_pause();
374 loopIndex++;
375#else
376 yield();
377#endif
378 }
379 // -- GODOT start --
380 // return except;
381 return;
382 // -- GODOT end --
383 }
384
385 bool TaskScheduler::steal_from_other_threads(Thread& thread)
386 {
387 const size_t threadIndex = thread.threadIndex;
388 const size_t threadCount = this->threadCounter;
389
390 for (size_t i=1; i<threadCount; i++)
391 {
392 pause_cpu(32);
393 size_t otherThreadIndex = threadIndex+i;
394 if (otherThreadIndex >= threadCount) otherThreadIndex -= threadCount;
395
396 Thread* othread = threadLocal[otherThreadIndex].load();
397 if (!othread)
398 continue;
399
400 if (othread->tasks.steal(thread))
401 return true;
402 }
403
404 return false;
405 }
406
407 dll_export void TaskScheduler::startThreads() {
408 threadPool->startThreads();
409 }
410
411 dll_export void TaskScheduler::addScheduler(const Ref<TaskScheduler>& scheduler) {
412 threadPool->add(scheduler);
413 }
414
415 dll_export void TaskScheduler::removeScheduler(const Ref<TaskScheduler>& scheduler) {
416 threadPool->remove(scheduler);
417 }
418
419 RTC_NAMESPACE_END
420}
421