1//************************************ bs::framework - Copyright 2018 Marko Pintera **************************************//
2//*********** Licensed under the MIT license. See LICENSE.md for full terms. This notice is not to be removed. ***********//
3#include "CoreThread/BsCommandQueue.h"
4#include "Error/BsException.h"
5#include "CoreThread/BsCoreThread.h"
6#include "Debug/BsDebug.h"
7
8namespace bs
9{
10#if BS_DEBUG_MODE
11 CommandQueueBase::CommandQueueBase(ThreadId threadId)
12 :mMyThreadId(threadId), mMaxDebugIdx(0)
13 {
14 mAsyncOpSyncData = bs_shared_ptr_new<AsyncOpSyncData>();
15 mCommands = bs_new<bs::Queue<QueuedCommand>>();
16
17 {
18 Lock lock(CommandQueueBreakpointMutex);
19
20 mCommandQueueIdx = MaxCommandQueueIdx++;
21 }
22 }
23#else
24 CommandQueueBase::CommandQueueBase(ThreadId threadId)
25 :mMyThreadId(threadId)
26 {
27 mAsyncOpSyncData = bs_shared_ptr_new<AsyncOpSyncData>();
28 mCommands = bs_new<bs::Queue<QueuedCommand>>();
29 }
30#endif
31
32 CommandQueueBase::~CommandQueueBase()
33 {
34 if(mCommands != nullptr)
35 bs_delete(mCommands);
36
37 while(!mEmptyCommandQueues.empty())
38 {
39 bs_delete(mEmptyCommandQueues.top());
40 mEmptyCommandQueues.pop();
41 }
42 }
43
44 AsyncOp CommandQueueBase::queueReturn(std::function<void(AsyncOp&)> commandCallback, bool _notifyWhenComplete, UINT32 _callbackId)
45 {
46#if BS_DEBUG_MODE
47 breakIfNeeded(mCommandQueueIdx, mMaxDebugIdx);
48
49 QueuedCommand newCommand(commandCallback, mMaxDebugIdx++, mAsyncOpSyncData, _notifyWhenComplete, _callbackId);
50#else
51 QueuedCommand newCommand(commandCallback, mAsyncOpSyncData, _notifyWhenComplete, _callbackId);
52#endif
53
54 mCommands->push(newCommand);
55
56#if BS_FORCE_SINGLETHREADED_RENDERING
57 Queue<QueuedCommand>* commands = flush();
58 playback(commands);
59#endif
60
61 return newCommand.asyncOp;
62 }
63
64 void CommandQueueBase::queue(std::function<void()> commandCallback, bool _notifyWhenComplete, UINT32 _callbackId)
65 {
66#if BS_DEBUG_MODE
67 breakIfNeeded(mCommandQueueIdx, mMaxDebugIdx);
68
69 QueuedCommand newCommand(commandCallback, mMaxDebugIdx++, _notifyWhenComplete, _callbackId);
70#else
71 QueuedCommand newCommand(commandCallback, _notifyWhenComplete, _callbackId);
72#endif
73
74 mCommands->push(newCommand);
75
76#if BS_FORCE_SINGLETHREADED_RENDERING
77 Queue<QueuedCommand>* commands = flush();
78 playback(commands);
79#endif
80 }
81
82 bs::Queue<QueuedCommand>* CommandQueueBase::flush()
83 {
84 bs::Queue<QueuedCommand>* oldCommands = mCommands;
85
86 Lock lock(mEmptyCommandQueueMutex);
87 if(!mEmptyCommandQueues.empty())
88 {
89 mCommands = mEmptyCommandQueues.top();
90 mEmptyCommandQueues.pop();
91 }
92 else
93 {
94 mCommands = bs_new<bs::Queue<QueuedCommand>>();
95 }
96
97 return oldCommands;
98 }
99
100 void CommandQueueBase::playbackWithNotify(bs::Queue<QueuedCommand>* commands, std::function<void(UINT32)> notifyCallback)
101 {
102 THROW_IF_NOT_CORE_THREAD;
103
104 if(commands == nullptr)
105 return;
106
107 while(!commands->empty())
108 {
109 QueuedCommand& command = commands->front();
110
111 if(command.returnsValue)
112 {
113 AsyncOp& op = command.asyncOp;
114 command.callbackWithReturnValue(op);
115
116 if(!command.asyncOp.hasCompleted())
117 {
118 LOGDBG("Async operation return value wasn't resolved properly. Resolving automatically to nullptr. " \
119 "Make sure to complete the operation before returning from the command callback method.");
120 command.asyncOp._completeOperation(nullptr);
121 }
122 }
123 else
124 {
125 command.callback();
126 }
127
128 if(command.notifyWhenComplete && notifyCallback != nullptr)
129 {
130 notifyCallback(command.callbackId);
131 }
132
133 commands->pop();
134 }
135
136 Lock lock(mEmptyCommandQueueMutex);
137 mEmptyCommandQueues.push(commands);
138 }
139
140 void CommandQueueBase::playback(bs::Queue<QueuedCommand>* commands)
141 {
142 playbackWithNotify(commands, std::function<void(UINT32)>());
143 }
144
145 void CommandQueueBase::cancelAll()
146 {
147 bs::Queue<QueuedCommand>* commands = flush();
148
149 while(!commands->empty())
150 commands->pop();
151
152 Lock lock(mEmptyCommandQueueMutex);
153 mEmptyCommandQueues.push(commands);
154 }
155
156 bool CommandQueueBase::isEmpty()
157 {
158 if(mCommands != nullptr && mCommands->size() > 0)
159 return false;
160
161 return true;
162 }
163
164 void CommandQueueBase::throwInvalidThreadException(const String& message) const
165 {
166 BS_EXCEPT(InternalErrorException, message);
167 }
168
169#if BS_DEBUG_MODE
170 Mutex CommandQueueBase::CommandQueueBreakpointMutex;
171
172 UINT32 CommandQueueBase::MaxCommandQueueIdx = 0;
173
174 UnorderedSet<CommandQueueBase::QueueBreakpoint, CommandQueueBase::QueueBreakpoint::HashFunction,
175 CommandQueueBase::QueueBreakpoint::EqualFunction> CommandQueueBase::SetBreakpoints;
176
177 inline size_t CommandQueueBase::QueueBreakpoint::HashFunction::operator()(const QueueBreakpoint& v) const
178 {
179 size_t seed = 0;
180 bs_hash_combine(seed, v.queueIdx);
181 bs_hash_combine(seed, v.commandIdx);
182 return seed;
183 }
184
185 inline bool CommandQueueBase::QueueBreakpoint::EqualFunction::operator()(const QueueBreakpoint &a, const QueueBreakpoint &b) const
186 {
187 return a.queueIdx == b.queueIdx && a.commandIdx == b.commandIdx;
188 }
189
190 void CommandQueueBase::addBreakpoint(UINT32 queueIdx, UINT32 commandIdx)
191 {
192 Lock lock(CommandQueueBreakpointMutex);
193
194 SetBreakpoints.insert(QueueBreakpoint(queueIdx, commandIdx));
195 }
196
197 void CommandQueueBase::breakIfNeeded(UINT32 queueIdx, UINT32 commandIdx)
198 {
199 // I purposely don't use a mutex here, as this gets called very often. Generally breakpoints
200 // will only be added at the start of the application, so race conditions should not occur.
201 auto iterFind = SetBreakpoints.find(QueueBreakpoint(queueIdx, commandIdx));
202
203 if(iterFind != SetBreakpoints.end())
204 {
205 assert(false && "Command queue breakpoint triggered!");
206 }
207 }
208#else
209 void CommandQueueBase::addBreakpoint(UINT32 queueIdx, UINT32 commandIdx)
210 {
211 // Do nothing, no breakpoints in release
212 }
213#endif
214}