1//************************************ bs::framework - Copyright 2018 Marko Pintera **************************************//
2//*********** Licensed under the MIT license. See LICENSE.md for full terms. This notice is not to be removed. ***********//
3#pragma once
4
5#include "BsCorePrerequisites.h"
6#include "Threading/BsAsyncOp.h"
7#include <functional>
8
9namespace bs
10{
11 /** @addtogroup CoreThread-Internal
12 * @{
13 */
14
15 /**
16 * Command queue policy that provides no synchonization. Should be used with command queues that are used on a single
17 * thread only.
18 */
19 class CommandQueueNoSync
20 {
21 public:
22 struct LockGuard { };
23
24 bool isValidThread(ThreadId ownerThread) const
25 {
26 return BS_THREAD_CURRENT_ID == ownerThread;
27 }
28
29 LockGuard lock();
30 };
31
32 /**
33 * Command queue policy that provides synchonization. Should be used with command queues that are used on multiple
34 * threads.
35 */
36 class CommandQueueSync
37 {
38 public:
39 struct LockGuard
40 {
41 Lock lock;
42 };
43
44 bool isValidThread(ThreadId ownerThread) const
45 {
46 return true;
47 }
48
49 LockGuard lock()
50 {
51 return LockGuard { Lock(mCommandQueueMutex) };
52 };
53
54 private:
55 Mutex mCommandQueueMutex;
56 };
57
58 /**
59 * Represents a single queued command in the command list. Contains all the data for executing the command and checking
60 * up on the command status.
61 */
62 struct QueuedCommand
63 {
64#if BS_DEBUG_MODE
65 QueuedCommand(std::function<void(AsyncOp&)> _callback, UINT32 _debugId, const SPtr<AsyncOpSyncData>& asyncOpSyncData,
66 bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
67 : debugId(_debugId), callbackWithReturnValue(_callback), asyncOp(asyncOpSyncData), returnsValue(true)
68 , callbackId(_callbackId), notifyWhenComplete(_notifyWhenComplete)
69 { }
70
71 QueuedCommand(std::function<void()> _callback, UINT32 _debugId, bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
72 :debugId(_debugId), callback(_callback), asyncOp(AsyncOpEmpty()), returnsValue(false), callbackId(_callbackId)
73 , notifyWhenComplete(_notifyWhenComplete)
74 { }
75
76 UINT32 debugId;
77#else
78 QueuedCommand(std::function<void(AsyncOp&)> _callback, const SPtr<AsyncOpSyncData>& asyncOpSyncData,
79 bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
80 : callbackWithReturnValue(_callback), asyncOp(asyncOpSyncData), returnsValue(true), callbackId(_callbackId)
81 , notifyWhenComplete(_notifyWhenComplete)
82 { }
83
84 QueuedCommand(std::function<void()> _callback, bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
85 : callback(_callback), asyncOp(AsyncOpEmpty()), returnsValue(false), callbackId(_callbackId)
86 , notifyWhenComplete(_notifyWhenComplete)
87 { }
88#endif
89
90 ~QueuedCommand()
91 { }
92
93 QueuedCommand(const QueuedCommand& source)
94 {
95 callback = source.callback;
96 callbackWithReturnValue = source.callbackWithReturnValue;
97 asyncOp = source.asyncOp;
98 returnsValue = source.returnsValue;
99 callbackId = source.callbackId;
100 notifyWhenComplete = source.notifyWhenComplete;
101
102#if BS_DEBUG_MODE
103 debugId = source.debugId;
104#endif
105 }
106
107 QueuedCommand& operator=(const QueuedCommand& rhs)
108 {
109 callback = rhs.callback;
110 callbackWithReturnValue = rhs.callbackWithReturnValue;
111 asyncOp = rhs.asyncOp;
112 returnsValue = rhs.returnsValue;
113 callbackId = rhs.callbackId;
114 notifyWhenComplete = rhs.notifyWhenComplete;
115
116#if BS_DEBUG_MODE
117 debugId = rhs.debugId;
118#endif
119
120 return *this;
121 }
122
123 std::function<void()> callback;
124 std::function<void(AsyncOp&)> callbackWithReturnValue;
125 AsyncOp asyncOp;
126 bool returnsValue;
127 UINT32 callbackId;
128 bool notifyWhenComplete;
129 };
130
131 /** Manages a list of commands that can be queued for later execution on the core thread. */
132 class BS_CORE_EXPORT CommandQueueBase
133 {
134 public:
135 /**
136 * Constructor.
137 *
138 * @param[in] threadId Identifier for the thread the command queue will be getting commands from.
139 */
140 CommandQueueBase(ThreadId threadId);
141
142 /**
143 * Gets the thread identifier the command queue is used on.
144 *
145 * @note If the command queue is using a synchonized access policy generally this is not relevant as it may be
146 * used on multiple threads.
147 */
148 ThreadId getThreadId() const { return mMyThreadId; }
149
150 /**
151 * Executes all provided commands one by one in order. To get the commands you should call flush().
152 *
153 * @param[in] commands Commands to execute.
154 * @param[in] notifyCallback Callback that will be called if a command that has @p notifyOnComplete flag set.
155 * The callback will receive @p callbackId of the command.
156 */
157 void playbackWithNotify(Queue<QueuedCommand>* commands, std::function<void(UINT32)> notifyCallback);
158
159 /** Executes all provided commands one by one in order. To get the commands you should call flush(). */
160 void playback(Queue<QueuedCommand>* commands);
161
162 /**
163 * Allows you to set a breakpoint that will trigger when the specified command is executed.
164 *
165 * @param[in] queueIdx Zero-based index of the queue the command was queued on.
166 * @param[in] commandIdx Zero-based index of the command.
167 *
168 * @note
169 * This is helpful when you receive an error on the executing thread and you cannot tell from where was the command
170 * that caused the error queued from. However you can make a note of the queue and command index and set a
171 * breakpoint so that it gets triggered next time you run the program. At that point you can know exactly which part
172 * of code queued the command by examining the stack trace.
173 */
174 static void addBreakpoint(UINT32 queueIdx, UINT32 commandIdx);
175
176 /**
177 * Queue up a new command to execute. Make sure the provided function has all of its parameters properly bound.
178 * Last parameter must be unbound and of AsyncOp& type. This is used to signal that the command is completed, and
179 * also for storing the return value.
180 *
181 * @param[in] commandCallback Command to queue for execution.
182 * @param[in] _notifyWhenComplete (optional) Call the notify method (provided in the call to playback())
183 * when the command is complete.
184 * @param[in] _callbackId (optional) Identifier for the callback so you can then later find it
185 * if needed.
186 *
187 * @return Async operation object that you can continuously check until the command
188 * completes. After it completes AsyncOp::isResolved() will return true and return
189 * data will be valid (if the callback provided any).
190 *
191 * @note
192 * Callback method also needs to call AsyncOp::markAsResolved once it is done processing. (If it doesn't it will
193 * still be called automatically, but the return value will default to nullptr)
194 */
195 AsyncOp queueReturn(std::function<void(AsyncOp&)> commandCallback, bool _notifyWhenComplete = false, UINT32 _callbackId = 0);
196
197 /**
198 * Queue up a new command to execute. Make sure the provided function has all of its parameters properly bound.
199 * Provided command is not expected to return a value. If you wish to return a value from the callback use the
200 * queueReturn() which accepts an AsyncOp parameter.
201 *
202 * @param[in] commandCallback Command to queue for execution.
203 * @param[in] _notifyWhenComplete (optional) Call the notify method (provided in the call to playback())
204 * when the command is complete.
205 * @param[in] _callbackId (optional) Identifier for the callback so you can then later find
206 * it if needed.
207 */
208 void queue(std::function<void()> commandCallback, bool _notifyWhenComplete = false, UINT32 _callbackId = 0);
209
210 /**
211 * Returns a copy of all queued commands and makes room for new ones. Must be called from the thread that created
212 * the command queue. Returned commands must be passed to playback() method.
213 */
214 Queue<QueuedCommand>* flush();
215
216 /** Cancels all currently queued commands. */
217 void cancelAll();
218
219 /** Returns true if no commands are queued. */
220 bool isEmpty();
221
222 protected:
223 ~CommandQueueBase();
224
225 /**
226 * Helper method that throws an "Invalid thread" exception. Used primarily so we can avoid including Exception
227 * include in this header.
228 */
229 void throwInvalidThreadException(const String& message) const;
230
231 private:
232 Queue<QueuedCommand>* mCommands;
233
234 SPtr<AsyncOpSyncData> mAsyncOpSyncData;
235 ThreadId mMyThreadId;
236
237 Stack<Queue<QueuedCommand>*> mEmptyCommandQueues; /**< List of empty queues for reuse. */
238 Mutex mEmptyCommandQueueMutex;
239
240 // Various variables that allow for easier debugging by allowing us to trigger breakpoints
241 // when a certain command was queued.
242#if BS_DEBUG_MODE
243 struct QueueBreakpoint
244 {
245 class HashFunction
246 {
247 public:
248 size_t operator()(const QueueBreakpoint &key) const;
249 };
250
251 class EqualFunction
252 {
253 public:
254 bool operator()(const QueueBreakpoint &a, const QueueBreakpoint &b) const;
255 };
256
257 QueueBreakpoint(UINT32 _queueIdx, UINT32 _commandIdx)
258 :queueIdx(_queueIdx), commandIdx(_commandIdx)
259 { }
260
261 UINT32 queueIdx;
262 UINT32 commandIdx;
263
264 inline size_t operator()(const QueueBreakpoint& v) const;
265 };
266
267 UINT32 mMaxDebugIdx;
268 UINT32 mCommandQueueIdx;
269
270 static UINT32 MaxCommandQueueIdx;
271 static UnorderedSet<QueueBreakpoint, QueueBreakpoint::HashFunction, QueueBreakpoint::EqualFunction> SetBreakpoints;
272 static Mutex CommandQueueBreakpointMutex;
273
274 /** Checks if the specified command has a breakpoint and throw an assert if it does. */
275 static void breakIfNeeded(UINT32 queueIdx, UINT32 commandIdx);
276#endif
277 };
278
279 /**
280 * @copydoc CommandQueueBase
281 *
282 * Use SyncPolicy to choose whether you want command queue be synchonized or not. Synchonized command queues may be
283 * used across multiple threads and non-synchonized only on one.
284 */
285 template<class SyncPolicy = CommandQueueNoSync>
286 class CommandQueue : public CommandQueueBase, public SyncPolicy
287 {
288 public:
289 /** @copydoc CommandQueueBase::CommandQueueBase */
290 CommandQueue(ThreadId threadId)
291 :CommandQueueBase(threadId)
292 { }
293
294 ~CommandQueue()
295 { }
296
297 /** @copydoc CommandQueueBase::queueReturn */
298 AsyncOp queueReturn(std::function<void(AsyncOp&)> commandCallback, bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
299 {
300#if BS_DEBUG_MODE
301#if BS_THREAD_SUPPORT != 0
302 if(!this->isValidThread(getThreadId()))
303 throwInvalidThreadException("Command queue accessed outside of its creation thread.");
304#endif
305#endif
306
307 typename SyncPolicy::LockGuard lockGuard = this->lock();
308 AsyncOp asyncOp = CommandQueueBase::queueReturn(commandCallback, _notifyWhenComplete, _callbackId);
309
310 return asyncOp;
311 }
312
313 /** @copydoc CommandQueueBase::queue */
314 void queue(std::function<void()> commandCallback, bool _notifyWhenComplete = false, UINT32 _callbackId = 0)
315 {
316#if BS_DEBUG_MODE
317#if BS_THREAD_SUPPORT != 0
318 if(!this->isValidThread(getThreadId()))
319 throwInvalidThreadException("Command queue accessed outside of its creation thread.");
320#endif
321#endif
322
323 typename SyncPolicy::LockGuard lockGuard = this->lock();
324 CommandQueueBase::queue(commandCallback, _notifyWhenComplete, _callbackId);
325 }
326
327 /** @copydoc CommandQueueBase::flush */
328 bs::Queue<QueuedCommand>* flush()
329 {
330#if BS_DEBUG_MODE
331#if BS_THREAD_SUPPORT != 0
332 if(!this->isValidThread(getThreadId()))
333 throwInvalidThreadException("Command queue accessed outside of its creation thread.");
334#endif
335#endif
336
337 typename SyncPolicy::LockGuard lockGuard = this->lock();
338 Queue<QueuedCommand>* commands = CommandQueueBase::flush();
339
340 return commands;
341 }
342
343 /** @copydoc CommandQueueBase::cancelAll */
344 void cancelAll()
345 {
346#if BS_DEBUG_MODE
347#if BS_THREAD_SUPPORT != 0
348 if(!this->isValidThread(getThreadId()))
349 throwInvalidThreadException("Command queue accessed outside of its creation thread.");
350#endif
351#endif
352
353 typename SyncPolicy::LockGuard lockGuard = this->lock();
354 CommandQueueBase::cancelAll();
355 }
356
357 /** @copydoc CommandQueueBase::isEmpty */
358 bool isEmpty()
359 {
360#if BS_DEBUG_MODE
361#if BS_THREAD_SUPPORT != 0
362 if(!this->isValidThread(getThreadId()))
363 throwInvalidThreadException("Command queue accessed outside of its creation thread.");
364#endif
365#endif
366
367 typename SyncPolicy::LockGuard lockGuard = this->lock();
368 bool empty = CommandQueueBase::isEmpty();
369
370 return empty;
371 }
372 };
373
374 /** @} */
375}