1 | //************************************ bs::framework - Copyright 2018 Marko Pintera **************************************// |
2 | //*********** Licensed under the MIT license. See LICENSE.md for full terms. This notice is not to be removed. ***********// |
3 | #include "CoreThread/BsCoreThread.h" |
4 | #include "Threading/BsThreadPool.h" |
5 | #include "Threading/BsTaskScheduler.h" |
6 | #include "BsCoreApplication.h" |
7 | |
8 | using namespace std::placeholders; |
9 | |
10 | namespace bs |
11 | { |
12 | CoreThread::QueueData CoreThread::mPerThreadQueue; |
13 | BS_THREADLOCAL CoreThread::ThreadQueueContainer* CoreThread::QueueData::current = nullptr; |
14 | |
15 | CoreThread::CoreThread() |
16 | : mActiveFrameAlloc(0) |
17 | , mCoreThreadShutdown(false) |
18 | , mCoreThreadStarted(false) |
19 | , mCommandQueue(nullptr) |
20 | , mMaxCommandNotifyId(0) |
21 | { |
22 | for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++) |
23 | { |
24 | mFrameAllocs[i] = bs_new<FrameAlloc>(); |
25 | mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread |
26 | } |
27 | |
28 | mSimThreadId = BS_THREAD_CURRENT_ID; |
29 | mCoreThreadId = mSimThreadId; // For now |
30 | mCommandQueue = bs_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID); |
31 | |
32 | initCoreThread(); |
33 | } |
34 | |
35 | CoreThread::~CoreThread() |
36 | { |
37 | // TODO - What if something gets queued between the queued call to destroy_internal and this!? |
38 | shutdownCoreThread(); |
39 | |
40 | { |
41 | Lock lock(mSubmitMutex); |
42 | |
43 | for(auto& queue : mAllQueues) |
44 | bs_delete(queue); |
45 | |
46 | mAllQueues.clear(); |
47 | } |
48 | |
49 | if(mCommandQueue != nullptr) |
50 | { |
51 | bs_delete(mCommandQueue); |
52 | mCommandQueue = nullptr; |
53 | } |
54 | |
55 | for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++) |
56 | { |
57 | mFrameAllocs[i]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread |
58 | bs_delete(mFrameAllocs[i]); |
59 | } |
60 | } |
61 | |
62 | void CoreThread::initCoreThread() |
63 | { |
64 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
65 | #if BS_THREAD_SUPPORT |
66 | mCoreThread = ThreadPool::instance().run("Core" , std::bind(&CoreThread::runCoreThread, this)); |
67 | |
68 | // Need to wait to unsure thread ID is correctly set before continuing |
69 | Lock lock(mThreadStartedMutex); |
70 | |
71 | while (!mCoreThreadStarted) |
72 | mCoreThreadStartedCondition.wait(lock); |
73 | #else |
74 | BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support." ); |
75 | #endif |
76 | #endif |
77 | } |
78 | |
79 | void CoreThread::runCoreThread() |
80 | { |
81 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
82 | TaskScheduler::instance().removeWorker(); // One less worker because we are reserving one core for this thread |
83 | |
84 | { |
85 | Lock lock(mThreadStartedMutex); |
86 | |
87 | mCoreThreadStarted = true; |
88 | mCoreThreadId = BS_THREAD_CURRENT_ID; |
89 | } |
90 | |
91 | mCoreThreadStartedCondition.notify_one(); |
92 | |
93 | while(true) |
94 | { |
95 | // Wait until we get some ready commands |
96 | Queue<QueuedCommand>* commands = nullptr; |
97 | { |
98 | Lock lock(mCommandQueueMutex); |
99 | |
100 | while(mCommandQueue->isEmpty()) |
101 | { |
102 | if(mCoreThreadShutdown) |
103 | { |
104 | TaskScheduler::instance().addWorker(); |
105 | return; |
106 | } |
107 | |
108 | TaskScheduler::instance().addWorker(); // Do something else while we wait, otherwise this core will be unused |
109 | mCommandReadyCondition.wait(lock); |
110 | TaskScheduler::instance().removeWorker(); |
111 | } |
112 | |
113 | commands = mCommandQueue->flush(); |
114 | } |
115 | |
116 | // Play commands |
117 | mCommandQueue->playbackWithNotify(commands, std::bind(&CoreThread::commandCompletedNotify, this, _1)); |
118 | } |
119 | #endif |
120 | } |
121 | |
122 | void CoreThread::shutdownCoreThread() |
123 | { |
124 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
125 | |
126 | { |
127 | Lock lock(mCommandQueueMutex); |
128 | mCoreThreadShutdown = true; |
129 | } |
130 | |
131 | // Wake all threads. They will quit after they see the shutdown flag |
132 | mCommandReadyCondition.notify_all(); |
133 | |
134 | mCoreThreadId = BS_THREAD_CURRENT_ID; |
135 | |
136 | mCoreThread.blockUntilComplete(); |
137 | #endif |
138 | } |
139 | |
140 | SPtr<CommandQueue<CommandQueueSync>> CoreThread::getQueue() |
141 | { |
142 | if(mPerThreadQueue.current == nullptr) |
143 | { |
144 | SPtr<CommandQueue<CommandQueueSync>> newQueue = bs_shared_ptr_new<CommandQueue<CommandQueueSync>>(BS_THREAD_CURRENT_ID); |
145 | mPerThreadQueue.current = bs_new<ThreadQueueContainer>(); |
146 | mPerThreadQueue.current->queue = newQueue; |
147 | mPerThreadQueue.current->isMain = BS_THREAD_CURRENT_ID == mSimThreadId; |
148 | |
149 | Lock lock(mSubmitMutex); |
150 | mAllQueues.push_back(mPerThreadQueue.current); |
151 | } |
152 | |
153 | return mPerThreadQueue.current->queue; |
154 | } |
155 | |
156 | void CoreThread::submitCommandQueue(CommandQueue<CommandQueueSync>& queue, bool blockUntilComplete) |
157 | { |
158 | Queue<QueuedCommand>* commands = queue.flush(); |
159 | |
160 | CoreThreadQueueFlags flags = CTQF_InternalQueue; |
161 | |
162 | if(blockUntilComplete) |
163 | flags |= CTQF_BlockUntilComplete; |
164 | |
165 | queueCommand(std::bind(&CommandQueueBase::playback, &queue, commands), flags); |
166 | } |
167 | |
168 | void CoreThread::submitAll(bool blockUntilComplete) |
169 | { |
170 | UINT32 blockCommandId = (UINT32)-1; |
171 | |
172 | { |
173 | // This lock is needed mainly because of blocking. Without it another submitting thread might flush a command |
174 | // we want to wait on. |
175 | Lock lock(mSubmitMutex); |
176 | |
177 | // Submit workers first |
178 | ThreadQueueContainer* mainQueue = nullptr; |
179 | for (auto& queue : mAllQueues) |
180 | { |
181 | if (!queue->isMain) |
182 | submitCommandQueue(*queue->queue, false); |
183 | else |
184 | mainQueue = queue; |
185 | } |
186 | |
187 | // Then main |
188 | if (mainQueue != nullptr) |
189 | submitCommandQueue(*mainQueue->queue, false); |
190 | |
191 | if(blockUntilComplete) |
192 | { |
193 | Lock lock2(mCommandQueueMutex); |
194 | |
195 | blockCommandId = mMaxCommandNotifyId++; |
196 | mCommandQueue->queue([](){}, true, blockCommandId); |
197 | } |
198 | } |
199 | |
200 | if(blockUntilComplete) |
201 | { |
202 | mCommandReadyCondition.notify_all(); |
203 | blockUntilCommandCompleted(blockCommandId); |
204 | } |
205 | } |
206 | |
207 | void CoreThread::submit(bool blockUntilComplete) |
208 | { |
209 | Lock lock(mSubmitMutex); |
210 | |
211 | CommandQueue<CommandQueueSync>& queue = *getQueue(); |
212 | Queue<QueuedCommand>* commands = queue.flush(); |
213 | |
214 | UINT32 commandId = -1; |
215 | { |
216 | Lock lock2(mCommandQueueMutex); |
217 | |
218 | if (blockUntilComplete) |
219 | { |
220 | commandId = mMaxCommandNotifyId++; |
221 | |
222 | mCommandQueue->queue([commands, &queue]() { queue.playback(commands); }, true, commandId); |
223 | } |
224 | else |
225 | mCommandQueue->queue([commands, &queue]() { queue.playback(commands); }); |
226 | } |
227 | |
228 | mCommandReadyCondition.notify_all(); |
229 | |
230 | if (blockUntilComplete) |
231 | blockUntilCommandCompleted(commandId); |
232 | } |
233 | |
234 | AsyncOp CoreThread::queueReturnCommand(std::function<void(AsyncOp&)> commandCallback, CoreThreadQueueFlags flags) |
235 | { |
236 | assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread" ); |
237 | |
238 | if (!flags.isSet(CTQF_InternalQueue)) |
239 | return getQueue()->queueReturn(commandCallback); |
240 | else |
241 | { |
242 | bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete); |
243 | |
244 | AsyncOp op; |
245 | UINT32 commandId = -1; |
246 | { |
247 | Lock lock(mCommandQueueMutex); |
248 | |
249 | if (blockUntilComplete) |
250 | { |
251 | commandId = mMaxCommandNotifyId++; |
252 | op = mCommandQueue->queueReturn(commandCallback, true, commandId); |
253 | } |
254 | else |
255 | op = mCommandQueue->queueReturn(commandCallback); |
256 | } |
257 | |
258 | mCommandReadyCondition.notify_all(); |
259 | |
260 | if (blockUntilComplete) |
261 | blockUntilCommandCompleted(commandId); |
262 | |
263 | return op; |
264 | } |
265 | } |
266 | |
267 | void CoreThread::queueCommand(std::function<void()> commandCallback, CoreThreadQueueFlags flags) |
268 | { |
269 | assert(BS_THREAD_CURRENT_ID != getCoreThreadId() && "Cannot queue commands on the core thread for the core thread" ); |
270 | |
271 | if (!flags.isSet(CTQF_InternalQueue)) |
272 | getQueue()->queue(commandCallback); |
273 | else |
274 | { |
275 | bool blockUntilComplete = flags.isSet(CTQF_BlockUntilComplete); |
276 | |
277 | UINT32 commandId = -1; |
278 | { |
279 | Lock lock(mCommandQueueMutex); |
280 | |
281 | if (blockUntilComplete) |
282 | { |
283 | commandId = mMaxCommandNotifyId++; |
284 | mCommandQueue->queue(commandCallback, true, commandId); |
285 | } |
286 | else |
287 | mCommandQueue->queue(commandCallback); |
288 | } |
289 | |
290 | mCommandReadyCondition.notify_all(); |
291 | |
292 | if (blockUntilComplete) |
293 | blockUntilCommandCompleted(commandId); |
294 | } |
295 | } |
296 | |
297 | void CoreThread::update() |
298 | { |
299 | for (UINT32 i = 0; i < NUM_SYNC_BUFFERS; i++) |
300 | mFrameAllocs[i]->setOwnerThread(mCoreThreadId); |
301 | |
302 | mActiveFrameAlloc = (mActiveFrameAlloc + 1) % 2; |
303 | mFrameAllocs[mActiveFrameAlloc]->setOwnerThread(BS_THREAD_CURRENT_ID); // Sim thread |
304 | mFrameAllocs[mActiveFrameAlloc]->clear(); |
305 | } |
306 | |
307 | FrameAlloc* CoreThread::getFrameAlloc() const |
308 | { |
309 | return mFrameAllocs[mActiveFrameAlloc]; |
310 | } |
311 | |
312 | void CoreThread::blockUntilCommandCompleted(UINT32 commandId) |
313 | { |
314 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
315 | |
316 | while(true) |
317 | { |
318 | Lock lock(mCommandNotifyMutex); |
319 | |
320 | // Check if our command id is in the completed list |
321 | auto iter = mCommandsCompleted.begin(); |
322 | for(; iter != mCommandsCompleted.end(); ++iter) |
323 | { |
324 | if(*iter == commandId) |
325 | { |
326 | mCommandsCompleted.erase(iter); |
327 | return; |
328 | } |
329 | } |
330 | |
331 | mCommandCompleteCondition.wait(lock); |
332 | } |
333 | #endif |
334 | } |
335 | |
336 | void CoreThread::commandCompletedNotify(UINT32 commandId) |
337 | { |
338 | { |
339 | Lock lock(mCommandNotifyMutex); |
340 | mCommandsCompleted.push_back(commandId); |
341 | } |
342 | |
343 | mCommandCompleteCondition.notify_all(); |
344 | } |
345 | |
346 | CoreThread& gCoreThread() |
347 | { |
348 | return CoreThread::instance(); |
349 | } |
350 | |
351 | void throwIfNotCoreThread() |
352 | { |
353 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
354 | if(BS_THREAD_CURRENT_ID != CoreThread::instance().getCoreThreadId()) |
355 | BS_EXCEPT(InternalErrorException, "This method can only be accessed from the core thread." ); |
356 | #endif |
357 | } |
358 | |
359 | void throwIfCoreThread() |
360 | { |
361 | #if !BS_FORCE_SINGLETHREADED_RENDERING |
362 | if(BS_THREAD_CURRENT_ID == CoreThread::instance().getCoreThreadId()) |
363 | BS_EXCEPT(InternalErrorException, "This method cannot be accessed from the core thread." ); |
364 | #endif |
365 | } |
366 | } |