1//************************************ bs::framework - Copyright 2018 Marko Pintera **************************************//
2//*********** Licensed under the MIT license. See LICENSE.md for full terms. This notice is not to be removed. ***********//
3#include "CoreThread/BsCoreThread.h"
4#include "Threading/BsThreadPool.h"
5#include "Threading/BsTaskScheduler.h"
6#include "BsCoreApplication.h"
7
8using namespace std::placeholders;
9
10namespace bs
11{
12 CoreThread::QueueData CoreThread::mPerThreadQueue;
13 BS_THREADLOCAL CoreThread::ThreadQueueContainer* CoreThread::QueueData::current = nullptr;
14
15 CoreThread::CoreThread()
16 : mActiveFrameAlloc(0)
17 , mCoreThreadShutdown(false)
18 , mCoreThreadStarted(false)
19 , mCommandQueue(nullptr)
20 , mMaxCommandNotifyId(0)
21 {
22 for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
23 {
24 mFrameAllocs[i] = bs_new<FrameAlloc>();
25 mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
26 }
27
28 mSimThreadId = BS_THREAD_CURRENT_ID;
29 mCoreThreadId = mSimThreadId; // For now
30 mCommandQueue = bs_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID);
31
32 initCoreThread();
33 }
34
35 CoreThread::~CoreThread()
36 {
37 // TODO - What if something gets queued between the queued call to destroy_internal and this!?
38 shutdownCoreThread();
39
40 {
41 Lock lock(mSubmitMutex);
42
43 for(auto& queue : mAllQueues)
44 bs_delete(queue);
45
46 mAllQueues.clear();
47 }
48
49 if(mCommandQueue != nullptr)
50 {
51 bs_delete(mCommandQueue);
52 mCommandQueue = nullptr;
53 }
54
55 for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
56 {
57 mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
58 bs_delete(mFrameAllocs[i]);
59 }
60 }
61
62 void CoreThread::initCoreThread()
63 {
64#if !BS_FORCE_SINGLETHREADED_RENDERING
65#if BS_THREAD_SUPPORT
66 mCoreThread = ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
67
68 // Need to wait to unsure thread ID is correctly set before continuing
69 Lock lock(mThreadStartedMutex);
70
71 while (!mCoreThreadStarted)
72 mCoreThreadStartedCondition.wait(lock);
73#else
74 BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
75#endif
76#endif
77 }
78
79 void CoreThread::runCoreThread()
80 {
81#if !BS_FORCE_SINGLETHREADED_RENDERING
82 TaskScheduler::instance().removeWorker(); // One less worker because we are reserving one core for this thread
83
84 {
85 Lock lock(mThreadStartedMutex);
86
87 mCoreThreadStarted = true;
88 mCoreThreadId = BS_THREAD_CURRENT_ID;
89 }
90
91 mCoreThreadStartedCondition.notify_one();
92
93 while(true)
94 {
95 // Wait until we get some ready commands
96 Queue<QueuedCommand>* commands = nullptr;
97 {
98 Lock lock(mCommandQueueMutex);
99
100 while(mCommandQueue->isEmpty())
101 {
102 if(mCoreThreadShutdown)
103 {
104 TaskScheduler::instance().addWorker();
105 return;
106 }
107
108 TaskScheduler::instance().addWorker(); // Do something else while we wait, otherwise this core will be unused
109 mCommandReadyCondition.wait(lock);
110 TaskScheduler::instance().removeWorker();
111 }
112
113 commands = mCommandQueue->flush();
114 }
115
116 // Play commands
117 mCommandQueue->playbackWithNotify(commands, std::bind(&CoreThread::commandCompletedNotify, this, _1));
118 }
119#endif
120 }
121
122 void CoreThread::shutdownCoreThread()
123 {
124#if !BS_FORCE_SINGLETHREADED_RENDERING
125
126 {
127 Lock lock(mCommandQueueMutex);
128 mCoreThreadShutdown = true;
129 }
130
131 // Wake all threads. They will quit after they see the shutdown flag
132 mCommandReadyCondition.notify_all();
133
134 mCoreThreadId = BS_THREAD_CURRENT_ID;
135
136 mCoreThread.blockUntilComplete();
137#endif
138 }
139
140 SPtr<CommandQueue<CommandQueueSync>> CoreThread::getQueue()
141 {
142 if(mPerThreadQueue.current == nullptr)
143 {
144 SPtr<CommandQueue<CommandQueueSync>> newQueue = bs_shared_ptr_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID);
145 mPerThreadQueue.current = bs_new<ThreadQueueContainer>();
146 mPerThreadQueue.current->queue = newQueue;
147 mPerThreadQueue.current->isMain = BS_THREAD_CURRENT_ID == mSimThreadId;
148
149 Lock lock(mSubmitMutex);
150 mAllQueues.push_back(mPerThreadQueue.current);
151 }
152
153 return mPerThreadQueue.current->queue;
154 }
155
156 void CoreThread::submitCommandQueue(CommandQueue<CommandQueueSync>& queue, bool blockUntilComplete)
157 {
158 Queue<QueuedCommand>* commands = queue.flush();
159
160 CoreThreadQueueFlags flags = CTQF_InternalQueue;
161
162 if(blockUntilComplete)
163 flags |= CTQF_BlockUntilComplete;
164
165 queueCommand(std::bind(&CommandQueueBase::playback, &queue, commands), flags);
166 }
167
168 void CoreThread::submitAll(bool blockUntilComplete)
169 {
170 UINT32 blockCommandId = (UINT32)-1;
171
172 {
173 // This lock is needed mainly because of blocking. Without it another submitting thread might flush a command
174 // we want to wait on.
175 Lock lock(mSubmitMutex);
176
177 // Submit workers first
178 ThreadQueueContainer* mainQueue = nullptr;
179 for (auto& queue : mAllQueues)
180 {
181 if (!queue->isMain)
182 submitCommandQueue(*queue->queue, false);
183 else
184 mainQueue = queue;
185 }
186
187 // Then main
188 if (mainQueue != nullptr)
189 submitCommandQueue(*mainQueue->queue, false);
190
191 if(blockUntilComplete)
192 {
193 Lock lock2(mCommandQueueMutex);
194
195 blockCommandId = mMaxCommandNotifyId++;
196 mCommandQueue->queue([](){}, true, blockCommandId);
197 }
198 }
199
200 if(blockUntilComplete)
201 {
202 mCommandReadyCondition.notify_all();
203 blockUntilCommandCompleted(blockCommandId);
204 }
205 }
206
207 void CoreThread::submit(bool blockUntilComplete)
208 {
209 Lock lock(mSubmitMutex);
210
211 CommandQueue<CommandQueueSync>& queue = *getQueue();
212 Queue<QueuedCommand>* commands = queue.flush();
213
214 UINT32 commandId = -1;
215 {
216 Lock lock2(mCommandQueueMutex);
217
218 if (blockUntilComplete)
219 {
220 commandId = mMaxCommandNotifyId++;
221
222 mCommandQueue->queue([commands, &queue]() { queue.playback(commands); }, true, commandId);
223 }
224 else
225 mCommandQueue->queue([commands, &queue]() { queue.playback(commands); });
226 }
227
228 mCommandReadyCondition.notify_all();
229
230 if (blockUntilComplete)
231 blockUntilCommandCompleted(commandId);
232 }
233
234 AsyncOp CoreThread::queueReturnCommand(std::function<void(AsyncOp&)> commandCallback, CoreThreadQueueFlags flags)
235 {
236 assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
237
238 if (!flags.isSet(CTQF_InternalQueue))
239 return getQueue()->queueReturn(commandCallback);
240 else
241 {
242 bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete);
243
244 AsyncOp op;
245 UINT32 commandId = -1;
246 {
247 Lock lock(mCommandQueueMutex);
248
249 if (blockUntilComplete)
250 {
251 commandId = mMaxCommandNotifyId++;
252 op = mCommandQueue->queueReturn(commandCallback, true, commandId);
253 }
254 else
255 op = mCommandQueue->queueReturn(commandCallback);
256 }
257
258 mCommandReadyCondition.notify_all();
259
260 if (blockUntilComplete)
261 blockUntilCommandCompleted(commandId);
262
263 return op;
264 }
265 }
266
267 void CoreThread::queueCommand(std::function<void()> commandCallback, CoreThreadQueueFlags flags)
268 {
269 assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread");
270
271 if (!flags.isSet(CTQF_InternalQueue))
272 getQueue()->queue(commandCallback);
273 else
274 {
275 bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete);
276
277 UINT32 commandId = -1;
278 {
279 Lock lock(mCommandQueueMutex);
280
281 if (blockUntilComplete)
282 {
283 commandId = mMaxCommandNotifyId++;
284 mCommandQueue->queue(commandCallback, true, commandId);
285 }
286 else
287 mCommandQueue->queue(commandCallback);
288 }
289
290 mCommandReadyCondition.notify_all();
291
292 if (blockUntilComplete)
293 blockUntilCommandCompleted(commandId);
294 }
295 }
296
297 void CoreThread::update()
298 {
299 for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++)
300 mFrameAllocs[i]->setOwnerThread(mCoreThreadId);
301
302 mActiveFrameAlloc = (mActiveFrameAlloc + 1) % 2;
303 mFrameAllocs[mActiveFrameAlloc]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread
304 mFrameAllocs[mActiveFrameAlloc]->clear();
305 }
306
307 FrameAlloc* CoreThread::getFrameAlloc() const
308 {
309 return mFrameAllocs[mActiveFrameAlloc];
310 }
311
312 void CoreThread::blockUntilCommandCompleted(UINT32 commandId)
313 {
314#if !BS_FORCE_SINGLETHREADED_RENDERING
315
316 while(true)
317 {
318 Lock lock(mCommandNotifyMutex);
319
320 // Check if our command id is in the completed list
321 auto iter = mCommandsCompleted.begin();
322 for(; iter != mCommandsCompleted.end(); ++iter)
323 {
324 if(*iter == commandId)
325 {
326 mCommandsCompleted.erase(iter);
327 return;
328 }
329 }
330
331 mCommandCompleteCondition.wait(lock);
332 }
333#endif
334 }
335
336 void CoreThread::commandCompletedNotify(UINT32 commandId)
337 {
338 {
339 Lock lock(mCommandNotifyMutex);
340 mCommandsCompleted.push_back(commandId);
341 }
342
343 mCommandCompleteCondition.notify_all();
344 }
345
346 CoreThread& gCoreThread()
347 {
348 return CoreThread::instance();
349 }
350
351 void throwIfNotCoreThread()
352 {
353#if !BS_FORCE_SINGLETHREADED_RENDERING
354 if(BS_THREAD_CURRENT_ID != CoreThread::instance().getCoreThreadId())
355 BS_EXCEPT(InternalErrorException, "This method can only be accessed from the core thread.");
356#endif
357 }
358
359 void throwIfCoreThread()
360 {
361#if !BS_FORCE_SINGLETHREADED_RENDERING
362 if(BS_THREAD_CURRENT_ID == CoreThread::instance().getCoreThreadId())
363 BS_EXCEPT(InternalErrorException, "This method cannot be accessed from the core thread.");
364#endif
365 }
366}