1// Copyright 2009-2021 Intel Corporation
2// SPDX-License-Identifier: Apache-2.0
3
4#pragma once
5
6#include "../sys/platform.h"
7#include "../sys/alloc.h"
8#include "../sys/barrier.h"
9#include "../sys/thread.h"
10#include "../sys/mutex.h"
11#include "../sys/condition.h"
12#include "../sys/ref.h"
13#include "../sys/atomic.h"
14#include "../math/range.h"
15#include "../../include/embree3/rtcore.h"
16
17#include <list>
18
19namespace embree
20{
21
22 /* The tasking system exports some symbols to be used by the tutorials. Thus we
23 hide is also in the API namespace when requested. */
24 RTC_NAMESPACE_BEGIN
25
26 struct TaskScheduler : public RefCount
27 {
28 ALIGNED_STRUCT_(64);
29 friend class Device;
30
31 static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack
32 static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures
33
34 struct Thread;
35
36 /*! virtual interface for all tasks */
37 struct TaskFunction {
38 virtual void execute() = 0;
39 };
40
41 /*! builds a task interface from a closure */
42 template<typename Closure>
43 struct ClosureTaskFunction : public TaskFunction
44 {
45 Closure closure;
46 __forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}
47 void execute() { closure(); };
48 };
49
50 struct __aligned(64) Task
51 {
52 /*! states a task can be in */
53 enum { DONE, INITIALIZED };
54
55 /*! switch from one state to another */
56 __forceinline void switch_state(int from, int to)
57 {
58 __memory_barrier();
59 MAYBE_UNUSED bool success = state.compare_exchange_strong(from,to);
60 assert(success);
61 }
62
63 /*! try to switch from one state to another */
64 __forceinline bool try_switch_state(int from, int to) {
65 __memory_barrier();
66 return state.compare_exchange_strong(from,to);
67 }
68
69 /*! increment/decrement dependency counter */
70 void add_dependencies(int n) {
71 dependencies+=n;
72 }
73
74 /*! initialize all tasks to DONE state by default */
75 __forceinline Task()
76 : state(DONE) {}
77
78 /*! construction of new task */
79 __forceinline Task (TaskFunction* closure, Task* parent, size_t stackPtr, size_t N)
80 : dependencies(1), stealable(true), closure(closure), parent(parent), stackPtr(stackPtr), N(N)
81 {
82 if (parent) parent->add_dependencies(+1);
83 switch_state(DONE,INITIALIZED);
84 }
85
86 /*! construction of stolen task, stealing thread will decrement initial dependency */
87 __forceinline Task (TaskFunction* closure, Task* parent)
88 : dependencies(1), stealable(false), closure(closure), parent(parent), stackPtr(-1), N(1)
89 {
90 switch_state(DONE,INITIALIZED);
91 }
92
93 /*! try to steal this task */
94 bool try_steal(Task& child)
95 {
96 if (!stealable) return false;
97 if (!try_switch_state(INITIALIZED,DONE)) return false;
98 new (&child) Task(closure, this);
99 return true;
100 }
101
102 /*! run this task */
103 dll_export void run(Thread& thread);
104
105 void run_internal(Thread& thread);
106
107 public:
108 std::atomic<int> state; //!< state this task is in
109 std::atomic<int> dependencies; //!< dependencies to wait for
110 std::atomic<bool> stealable; //!< true if task can be stolen
111 TaskFunction* closure; //!< the closure to execute
112 Task* parent; //!< parent task to signal when we are finished
113 size_t stackPtr; //!< stack location where closure is stored
114 size_t N; //!< approximative size of task
115 };
116
117 struct TaskQueue
118 {
119 TaskQueue ()
120 : left(0), right(0), stackPtr(0) {}
121
122 __forceinline void* alloc(size_t bytes, size_t align = 64)
123 {
124 size_t ofs = bytes + ((align - stackPtr) & (align-1));
125 if (stackPtr + ofs > CLOSURE_STACK_SIZE)
126 // -- GODOT start --
127 // throw std::runtime_error("closure stack overflow");
128 abort();
129 // -- GODOT end --
130 stackPtr += ofs;
131 return &stack[stackPtr-bytes];
132 }
133
134 template<typename Closure>
135 __forceinline void push_right(Thread& thread, const size_t size, const Closure& closure)
136 {
137 if (right >= TASK_STACK_SIZE)
138 // -- GODOT start --
139 // throw std::runtime_error("task stack overflow");
140 abort();
141 // -- GODOT end --
142
143 /* allocate new task on right side of stack */
144 size_t oldStackPtr = stackPtr;
145 TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);
146 new (&(tasks[right.load()])) Task(func,thread.task,oldStackPtr,size);
147 right++;
148
149 /* also move left pointer */
150 if (left >= right-1) left = right-1;
151 }
152
153 dll_export bool execute_local(Thread& thread, Task* parent);
154 bool execute_local_internal(Thread& thread, Task* parent);
155 bool steal(Thread& thread);
156 size_t getTaskSizeAtLeft();
157
158 bool empty() { return right == 0; }
159
160 public:
161
162 /* task stack */
163 Task tasks[TASK_STACK_SIZE];
164 __aligned(64) std::atomic<size_t> left; //!< threads steal from left
165 __aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right
166
167 /* closure stack */
168 __aligned(64) char stack[CLOSURE_STACK_SIZE];
169 size_t stackPtr;
170 };
171
172 /*! thread local structure for each thread */
173 struct Thread
174 {
175 ALIGNED_STRUCT_(64);
176
177 Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)
178 : threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}
179
180 __forceinline size_t threadCount() {
181 return scheduler->threadCounter;
182 }
183
184 size_t threadIndex; //!< ID of this thread
185 TaskQueue tasks; //!< local task queue
186 Task* task; //!< current active task
187 Ref<TaskScheduler> scheduler; //!< pointer to task scheduler
188 };
189
190 /*! pool of worker threads */
191 struct ThreadPool
192 {
193 ThreadPool (bool set_affinity);
194 ~ThreadPool ();
195
196 /*! starts the threads */
197 dll_export void startThreads();
198
199 /*! sets number of threads to use */
200 void setNumThreads(size_t numThreads, bool startThreads = false);
201
202 /*! adds a task scheduler object for scheduling */
203 dll_export void add(const Ref<TaskScheduler>& scheduler);
204
205 /*! remove the task scheduler object again */
206 dll_export void remove(const Ref<TaskScheduler>& scheduler);
207
208 /*! returns number of threads of the thread pool */
209 size_t size() const { return numThreads; }
210
211 /*! main loop for all threads */
212 void thread_loop(size_t threadIndex);
213
214 private:
215 std::atomic<size_t> numThreads;
216 std::atomic<size_t> numThreadsRunning;
217 bool set_affinity;
218 std::atomic<bool> running;
219 std::vector<thread_t> threads;
220
221 private:
222 MutexSys mutex;
223 ConditionSys condition;
224 std::list<Ref<TaskScheduler> > schedulers;
225 };
226
227 TaskScheduler ();
228 ~TaskScheduler ();
229
230 /*! initializes the task scheduler */
231 static void create(size_t numThreads, bool set_affinity, bool start_threads);
232
233 /*! destroys the task scheduler again */
234 static void destroy();
235
236 /*! lets new worker threads join the tasking system */
237 void join();
238 void reset();
239
240 /*! let a worker thread allocate a thread index */
241 dll_export ssize_t allocThreadIndex();
242
243 /*! wait for some number of threads available (threadCount includes main thread) */
244 void wait_for_threads(size_t threadCount);
245
246 /*! thread loop for all worker threads */
247 // -- GODOT start --
248 // std::exception_ptr thread_loop(size_t threadIndex);
249 void thread_loop(size_t threadIndex);
250 // -- GODOT end --
251
252 /*! steals a task from a different thread */
253 bool steal_from_other_threads(Thread& thread);
254
255 template<typename Predicate, typename Body>
256 static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);
257
258 /* spawn a new task at the top of the threads task stack */
259 template<typename Closure>
260 void spawn_root(const Closure& closure, size_t size = 1, bool useThreadPool = true)
261 {
262 if (useThreadPool) startThreads();
263
264 size_t threadIndex = allocThreadIndex();
265 std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
266 Thread& thread = *mthread;
267 assert(threadLocal[threadIndex].load() == nullptr);
268 threadLocal[threadIndex] = &thread;
269 Thread* oldThread = swapThread(&thread);
270 thread.tasks.push_right(thread,size,closure);
271 {
272 Lock<MutexSys> lock(mutex);
273 anyTasksRunning++;
274 hasRootTask = true;
275 condition.notify_all();
276 }
277
278 if (useThreadPool) addScheduler(this);
279
280 while (thread.tasks.execute_local(thread,nullptr));
281 anyTasksRunning--;
282 if (useThreadPool) removeScheduler(this);
283
284 threadLocal[threadIndex] = nullptr;
285 swapThread(oldThread);
286
287 /* remember exception to throw */
288 std::exception_ptr except = nullptr;
289 if (cancellingException != nullptr) except = cancellingException;
290
291 /* wait for all threads to terminate */
292 threadCounter--;
293 while (threadCounter > 0) yield();
294 cancellingException = nullptr;
295
296 /* re-throw proper exception */
297 if (except != nullptr)
298 std::rethrow_exception(except);
299 }
300
301 /* spawn a new task at the top of the threads task stack */
302 template<typename Closure>
303 static __forceinline void spawn(size_t size, const Closure& closure)
304 {
305 Thread* thread = TaskScheduler::thread();
306 if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure);
307 else instance()->spawn_root(closure,size);
308 }
309
310 /* spawn a new task at the top of the threads task stack */
311 template<typename Closure>
312 static __forceinline void spawn(const Closure& closure) {
313 spawn(1,closure);
314 }
315
316 /* spawn a new task set */
317 template<typename Index, typename Closure>
318 static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure)
319 {
320 spawn(end-begin, [=]()
321 {
322 if (end-begin <= blockSize) {
323 return closure(range<Index>(begin,end));
324 }
325 const Index center = (begin+end)/2;
326 spawn(begin,center,blockSize,closure);
327 spawn(center,end ,blockSize,closure);
328 wait();
329 });
330 }
331
332 /* work on spawned subtasks and wait until all have finished */
333 dll_export static bool wait();
334
335 /* returns the ID of the current thread */
336 dll_export static size_t threadID();
337
338 /* returns the index (0..threadCount-1) of the current thread */
339 dll_export static size_t threadIndex();
340
341 /* returns the total number of threads */
342 dll_export static size_t threadCount();
343
344 private:
345
346 /* returns the thread local task list of this worker thread */
347 dll_export static Thread* thread();
348
349 /* sets the thread local task list of this worker thread */
350 dll_export static Thread* swapThread(Thread* thread);
351
352 /*! returns the taskscheduler object to be used by the master thread */
353 dll_export static TaskScheduler* instance();
354
355 /*! starts the threads */
356 dll_export static void startThreads();
357
358 /*! adds a task scheduler object for scheduling */
359 dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);
360
361 /*! remove the task scheduler object again */
362 dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);
363
364 private:
365 std::vector<atomic<Thread*>> threadLocal;
366 std::atomic<size_t> threadCounter;
367 std::atomic<size_t> anyTasksRunning;
368 std::atomic<bool> hasRootTask;
369 std::exception_ptr cancellingException;
370 MutexSys mutex;
371 ConditionSys condition;
372
373 private:
374 static size_t g_numThreads;
375 static __thread TaskScheduler* g_instance;
376 static __thread Thread* thread_local_thread;
377 static ThreadPool* threadPool;
378 };
379
380 RTC_NAMESPACE_END
381
382#if defined(RTC_NAMESPACE)
383 using RTC_NAMESPACE::TaskScheduler;
384#endif
385}
386