1 | /**************************************************************************/ |
2 | /* message_queue.cpp */ |
3 | /**************************************************************************/ |
4 | /* This file is part of: */ |
5 | /* GODOT ENGINE */ |
6 | /* https://godotengine.org */ |
7 | /**************************************************************************/ |
8 | /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */ |
9 | /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */ |
10 | /* */ |
11 | /* Permission is hereby granted, free of charge, to any person obtaining */ |
12 | /* a copy of this software and associated documentation files (the */ |
13 | /* "Software"), to deal in the Software without restriction, including */ |
14 | /* without limitation the rights to use, copy, modify, merge, publish, */ |
15 | /* distribute, sublicense, and/or sell copies of the Software, and to */ |
16 | /* permit persons to whom the Software is furnished to do so, subject to */ |
17 | /* the following conditions: */ |
18 | /* */ |
19 | /* The above copyright notice and this permission notice shall be */ |
20 | /* included in all copies or substantial portions of the Software. */ |
21 | /* */ |
22 | /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ |
23 | /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ |
24 | /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ |
25 | /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ |
26 | /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ |
27 | /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ |
28 | /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ |
29 | /**************************************************************************/ |
30 | |
31 | #include "message_queue.h" |
32 | |
33 | #include "core/config/project_settings.h" |
34 | #include "core/core_string_names.h" |
35 | #include "core/object/class_db.h" |
36 | #include "core/object/script_language.h" |
37 | |
38 | #ifdef DEV_ENABLED |
39 | // Includes sanity checks to ensure that a queue set as a thread singleton override |
40 | // is only ever called from the thread it was set for. |
41 | #define LOCK_MUTEX \ |
42 | if (this != MessageQueue::thread_singleton) { \ |
43 | DEV_ASSERT(!this->is_current_thread_override); \ |
44 | mutex.lock(); \ |
45 | } else { \ |
46 | DEV_ASSERT(this->is_current_thread_override); \ |
47 | } |
48 | #else |
49 | #define LOCK_MUTEX \ |
50 | if (this != MessageQueue::thread_singleton) { \ |
51 | mutex.lock(); \ |
52 | } |
53 | #endif |
54 | |
55 | #define UNLOCK_MUTEX \ |
56 | if (this != MessageQueue::thread_singleton) { \ |
57 | mutex.unlock(); \ |
58 | } |
59 | |
60 | void CallQueue::_add_page() { |
61 | if (pages_used == page_bytes.size()) { |
62 | pages.push_back(allocator->alloc()); |
63 | page_bytes.push_back(0); |
64 | } |
65 | page_bytes[pages_used] = 0; |
66 | pages_used++; |
67 | } |
68 | |
69 | Error CallQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) { |
70 | return push_callablep(Callable(p_id, p_method), p_args, p_argcount, p_show_error); |
71 | } |
72 | |
73 | Error CallQueue::push_callp(Object *p_object, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) { |
74 | return push_callp(p_object->get_instance_id(), p_method, p_args, p_argcount, p_show_error); |
75 | } |
76 | |
77 | Error CallQueue::push_notification(Object *p_object, int p_notification) { |
78 | return push_notification(p_object->get_instance_id(), p_notification); |
79 | } |
80 | |
81 | Error CallQueue::push_set(Object *p_object, const StringName &p_prop, const Variant &p_value) { |
82 | return push_set(p_object->get_instance_id(), p_prop, p_value); |
83 | } |
84 | |
85 | Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_args, int p_argcount, bool p_show_error) { |
86 | uint32_t room_needed = sizeof(Message) + sizeof(Variant) * p_argcount; |
87 | |
88 | ERR_FAIL_COND_V_MSG(room_needed > uint32_t(PAGE_SIZE_BYTES), ERR_INVALID_PARAMETER, "Message is too large to fit on a page (" + itos(PAGE_SIZE_BYTES) + " bytes), consider passing less arguments." ); |
89 | |
90 | LOCK_MUTEX; |
91 | |
92 | _ensure_first_page(); |
93 | |
94 | if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { |
95 | if (pages_used == max_pages) { |
96 | ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. " + error_text); |
97 | statistics(); |
98 | UNLOCK_MUTEX; |
99 | return ERR_OUT_OF_MEMORY; |
100 | } |
101 | _add_page(); |
102 | } |
103 | |
104 | Page *page = pages[pages_used - 1]; |
105 | |
106 | uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; |
107 | |
108 | Message *msg = memnew_placement(buffer_end, Message); |
109 | msg->args = p_argcount; |
110 | msg->callable = p_callable; |
111 | msg->type = TYPE_CALL; |
112 | if (p_show_error) { |
113 | msg->type |= FLAG_SHOW_ERROR; |
114 | } |
115 | // Support callables of static methods. |
116 | if (p_callable.get_object_id().is_null() && p_callable.is_valid()) { |
117 | msg->type |= FLAG_NULL_IS_OK; |
118 | } |
119 | |
120 | buffer_end += sizeof(Message); |
121 | |
122 | for (int i = 0; i < p_argcount; i++) { |
123 | Variant *v = memnew_placement(buffer_end, Variant); |
124 | buffer_end += sizeof(Variant); |
125 | *v = *p_args[i]; |
126 | } |
127 | |
128 | page_bytes[pages_used - 1] += room_needed; |
129 | |
130 | UNLOCK_MUTEX; |
131 | |
132 | return OK; |
133 | } |
134 | |
135 | Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) { |
136 | LOCK_MUTEX; |
137 | uint32_t room_needed = sizeof(Message) + sizeof(Variant); |
138 | |
139 | _ensure_first_page(); |
140 | |
141 | if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { |
142 | if (pages_used == max_pages) { |
143 | String type; |
144 | if (ObjectDB::get_instance(p_id)) { |
145 | type = ObjectDB::get_instance(p_id)->get_class(); |
146 | } |
147 | ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text); |
148 | statistics(); |
149 | |
150 | UNLOCK_MUTEX; |
151 | return ERR_OUT_OF_MEMORY; |
152 | } |
153 | _add_page(); |
154 | } |
155 | |
156 | Page *page = pages[pages_used - 1]; |
157 | uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; |
158 | |
159 | Message *msg = memnew_placement(buffer_end, Message); |
160 | msg->args = 1; |
161 | msg->callable = Callable(p_id, p_prop); |
162 | msg->type = TYPE_SET; |
163 | |
164 | buffer_end += sizeof(Message); |
165 | |
166 | Variant *v = memnew_placement(buffer_end, Variant); |
167 | *v = p_value; |
168 | |
169 | page_bytes[pages_used - 1] += room_needed; |
170 | UNLOCK_MUTEX; |
171 | |
172 | return OK; |
173 | } |
174 | |
175 | Error CallQueue::push_notification(ObjectID p_id, int p_notification) { |
176 | ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER); |
177 | LOCK_MUTEX; |
178 | uint32_t room_needed = sizeof(Message); |
179 | |
180 | _ensure_first_page(); |
181 | |
182 | if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { |
183 | if (pages_used == max_pages) { |
184 | ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text); |
185 | statistics(); |
186 | UNLOCK_MUTEX; |
187 | return ERR_OUT_OF_MEMORY; |
188 | } |
189 | _add_page(); |
190 | } |
191 | |
192 | Page *page = pages[pages_used - 1]; |
193 | uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; |
194 | |
195 | Message *msg = memnew_placement(buffer_end, Message); |
196 | |
197 | msg->type = TYPE_NOTIFICATION; |
198 | msg->callable = Callable(p_id, CoreStringNames::get_singleton()->notification); //name is meaningless but callable needs it |
199 | //msg->target; |
200 | msg->notification = p_notification; |
201 | |
202 | page_bytes[pages_used - 1] += room_needed; |
203 | UNLOCK_MUTEX; |
204 | |
205 | return OK; |
206 | } |
207 | |
208 | void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error) { |
209 | const Variant **argptrs = nullptr; |
210 | if (p_argcount) { |
211 | argptrs = (const Variant **)alloca(sizeof(Variant *) * p_argcount); |
212 | for (int i = 0; i < p_argcount; i++) { |
213 | argptrs[i] = &p_args[i]; |
214 | } |
215 | } |
216 | |
217 | Callable::CallError ce; |
218 | Variant ret; |
219 | p_callable.callp(argptrs, p_argcount, ret, ce); |
220 | if (p_show_error && ce.error != Callable::CallError::CALL_OK) { |
221 | ERR_PRINT("Error calling deferred method: " + Variant::get_callable_error_text(p_callable, argptrs, p_argcount, ce) + "." ); |
222 | } |
223 | } |
224 | |
225 | Error CallQueue::_transfer_messages_to_main_queue() { |
226 | if (pages.size() == 0) { |
227 | return OK; |
228 | } |
229 | |
230 | CallQueue *mq = MessageQueue::main_singleton; |
231 | DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. |
232 | |
233 | mq->mutex.lock(); |
234 | |
235 | // Here we're transferring the data from this queue to the main one. |
236 | // However, it's very unlikely big amounts of messages will be queued here, |
237 | // so PagedArray/Pool would be overkill. Also, in most cases the data will fit |
238 | // an already existing page of the main queue. |
239 | |
240 | // Let's see if our first (likely only) page fits the current target queue page. |
241 | uint32_t src_page = 0; |
242 | { |
243 | if (mq->pages_used) { |
244 | uint32_t dst_page = mq->pages_used - 1; |
245 | uint32_t dst_offset = mq->page_bytes[dst_page]; |
246 | if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { |
247 | memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]); |
248 | mq->page_bytes[dst_page] += page_bytes[0]; |
249 | src_page++; |
250 | } |
251 | } |
252 | } |
253 | |
254 | // Any other possibly existing source page needs to be added. |
255 | |
256 | if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { |
257 | ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); |
258 | mq->statistics(); |
259 | mq->mutex.unlock(); |
260 | return ERR_OUT_OF_MEMORY; |
261 | } |
262 | |
263 | for (; src_page < pages_used; src_page++) { |
264 | mq->_add_page(); |
265 | memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]); |
266 | mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; |
267 | } |
268 | |
269 | mq->mutex.unlock(); |
270 | |
271 | page_bytes[0] = 0; |
272 | pages_used = 1; |
273 | |
274 | return OK; |
275 | } |
276 | |
277 | Error CallQueue::flush() { |
278 | // Thread overrides are not meant to be flushed, but appended to the main one. |
279 | if (unlikely(this == MessageQueue::thread_singleton)) { |
280 | return _transfer_messages_to_main_queue(); |
281 | } |
282 | |
283 | LOCK_MUTEX; |
284 | |
285 | if (pages.size() == 0) { |
286 | // Never allocated |
287 | UNLOCK_MUTEX; |
288 | return OK; // Do nothing. |
289 | } |
290 | |
291 | if (flushing) { |
292 | UNLOCK_MUTEX; |
293 | return ERR_BUSY; |
294 | } |
295 | |
296 | flushing = true; |
297 | |
298 | uint32_t i = 0; |
299 | uint32_t offset = 0; |
300 | |
301 | while (i < pages_used && offset < page_bytes[i]) { |
302 | Page *page = pages[i]; |
303 | |
304 | //lock on each iteration, so a call can re-add itself to the message queue |
305 | |
306 | Message *message = (Message *)&page->data[offset]; |
307 | |
308 | uint32_t advance = sizeof(Message); |
309 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
310 | advance += sizeof(Variant) * message->args; |
311 | } |
312 | |
313 | //pre-advance so this function is reentrant |
314 | offset += advance; |
315 | |
316 | Object *target = message->callable.get_object(); |
317 | |
318 | UNLOCK_MUTEX; |
319 | |
320 | switch (message->type & FLAG_MASK) { |
321 | case TYPE_CALL: { |
322 | if (target || (message->type & FLAG_NULL_IS_OK)) { |
323 | Variant *args = (Variant *)(message + 1); |
324 | _call_function(message->callable, args, message->args, message->type & FLAG_SHOW_ERROR); |
325 | } |
326 | } break; |
327 | case TYPE_NOTIFICATION: { |
328 | if (target) { |
329 | target->notification(message->notification); |
330 | } |
331 | } break; |
332 | case TYPE_SET: { |
333 | if (target) { |
334 | Variant *arg = (Variant *)(message + 1); |
335 | target->set(message->callable.get_method(), *arg); |
336 | } |
337 | } break; |
338 | } |
339 | |
340 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
341 | Variant *args = (Variant *)(message + 1); |
342 | for (int k = 0; k < message->args; k++) { |
343 | args[k].~Variant(); |
344 | } |
345 | } |
346 | |
347 | message->~Message(); |
348 | |
349 | LOCK_MUTEX; |
350 | if (offset == page_bytes[i]) { |
351 | i++; |
352 | offset = 0; |
353 | } |
354 | } |
355 | |
356 | page_bytes[0] = 0; |
357 | pages_used = 1; |
358 | |
359 | flushing = false; |
360 | UNLOCK_MUTEX; |
361 | return OK; |
362 | } |
363 | |
364 | void CallQueue::clear() { |
365 | LOCK_MUTEX; |
366 | |
367 | if (pages.size() == 0) { |
368 | UNLOCK_MUTEX; |
369 | return; // Nothing to clear. |
370 | } |
371 | |
372 | for (uint32_t i = 0; i < pages_used; i++) { |
373 | uint32_t offset = 0; |
374 | while (offset < page_bytes[i]) { |
375 | Page *page = pages[i]; |
376 | |
377 | //lock on each iteration, so a call can re-add itself to the message queue |
378 | |
379 | Message *message = (Message *)&page->data[offset]; |
380 | |
381 | uint32_t advance = sizeof(Message); |
382 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
383 | advance += sizeof(Variant) * message->args; |
384 | } |
385 | |
386 | offset += advance; |
387 | |
388 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
389 | Variant *args = (Variant *)(message + 1); |
390 | for (int k = 0; k < message->args; k++) { |
391 | args[k].~Variant(); |
392 | } |
393 | } |
394 | |
395 | message->~Message(); |
396 | } |
397 | } |
398 | |
399 | pages_used = 1; |
400 | page_bytes[0] = 0; |
401 | |
402 | UNLOCK_MUTEX; |
403 | } |
404 | |
405 | void CallQueue::statistics() { |
406 | LOCK_MUTEX; |
407 | HashMap<StringName, int> set_count; |
408 | HashMap<int, int> notify_count; |
409 | HashMap<Callable, int> call_count; |
410 | int null_count = 0; |
411 | |
412 | for (uint32_t i = 0; i < pages_used; i++) { |
413 | uint32_t offset = 0; |
414 | while (offset < page_bytes[i]) { |
415 | Page *page = pages[i]; |
416 | |
417 | //lock on each iteration, so a call can re-add itself to the message queue |
418 | |
419 | Message *message = (Message *)&page->data[offset]; |
420 | |
421 | uint32_t advance = sizeof(Message); |
422 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
423 | advance += sizeof(Variant) * message->args; |
424 | } |
425 | |
426 | Object *target = message->callable.get_object(); |
427 | |
428 | bool null_target = true; |
429 | switch (message->type & FLAG_MASK) { |
430 | case TYPE_CALL: { |
431 | if (target || (message->type & FLAG_NULL_IS_OK)) { |
432 | if (!call_count.has(message->callable)) { |
433 | call_count[message->callable] = 0; |
434 | } |
435 | |
436 | call_count[message->callable]++; |
437 | null_target = false; |
438 | } |
439 | } break; |
440 | case TYPE_NOTIFICATION: { |
441 | if (target) { |
442 | if (!notify_count.has(message->notification)) { |
443 | notify_count[message->notification] = 0; |
444 | } |
445 | |
446 | notify_count[message->notification]++; |
447 | null_target = false; |
448 | } |
449 | } break; |
450 | case TYPE_SET: { |
451 | if (target) { |
452 | StringName t = message->callable.get_method(); |
453 | if (!set_count.has(t)) { |
454 | set_count[t] = 0; |
455 | } |
456 | |
457 | set_count[t]++; |
458 | null_target = false; |
459 | } |
460 | } break; |
461 | } |
462 | if (null_target) { |
463 | //object was deleted |
464 | print_line("Object was deleted while awaiting a callback" ); |
465 | |
466 | null_count++; |
467 | } |
468 | |
469 | offset += advance; |
470 | |
471 | if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { |
472 | Variant *args = (Variant *)(message + 1); |
473 | for (int k = 0; k < message->args; k++) { |
474 | args[k].~Variant(); |
475 | } |
476 | } |
477 | |
478 | message->~Message(); |
479 | } |
480 | } |
481 | |
482 | print_line("TOTAL PAGES: " + itos(pages_used) + " (" + itos(pages_used * PAGE_SIZE_BYTES) + " bytes)." ); |
483 | print_line("NULL count: " + itos(null_count)); |
484 | |
485 | for (const KeyValue<StringName, int> &E : set_count) { |
486 | print_line("SET " + E.key + ": " + itos(E.value)); |
487 | } |
488 | |
489 | for (const KeyValue<Callable, int> &E : call_count) { |
490 | print_line("CALL " + E.key + ": " + itos(E.value)); |
491 | } |
492 | |
493 | for (const KeyValue<int, int> &E : notify_count) { |
494 | print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value)); |
495 | } |
496 | |
497 | UNLOCK_MUTEX; |
498 | } |
499 | |
500 | bool CallQueue::is_flushing() const { |
501 | return flushing; |
502 | } |
503 | |
504 | bool CallQueue::has_messages() const { |
505 | if (pages_used == 0) { |
506 | return false; |
507 | } |
508 | if (pages_used == 1 && page_bytes[0] == 0) { |
509 | return false; |
510 | } |
511 | |
512 | return true; |
513 | } |
514 | |
515 | int CallQueue::get_max_buffer_usage() const { |
516 | return pages.size() * PAGE_SIZE_BYTES; |
517 | } |
518 | |
519 | CallQueue::CallQueue(Allocator *p_custom_allocator, uint32_t p_max_pages, const String &p_error_text) { |
520 | if (p_custom_allocator) { |
521 | allocator = p_custom_allocator; |
522 | allocator_is_custom = true; |
523 | } else { |
524 | allocator = memnew(Allocator(16)); // 16 elements per allocator page, 64kb per allocator page. Anything small will do, though. |
525 | allocator_is_custom = false; |
526 | } |
527 | max_pages = p_max_pages; |
528 | error_text = p_error_text; |
529 | } |
530 | |
531 | CallQueue::~CallQueue() { |
532 | clear(); |
533 | // Let go of pages. |
534 | for (uint32_t i = 0; i < pages.size(); i++) { |
535 | allocator->free(pages[i]); |
536 | } |
537 | if (!allocator_is_custom) { |
538 | memdelete(allocator); |
539 | } |
540 | // This is done here to avoid a circular dependency between the sanity checks and the thread singleton pointer. |
541 | if (this == MessageQueue::thread_singleton) { |
542 | MessageQueue::thread_singleton = nullptr; |
543 | } |
544 | } |
545 | |
546 | ////////////////////// |
547 | |
548 | CallQueue *MessageQueue::main_singleton = nullptr; |
549 | thread_local CallQueue *MessageQueue::thread_singleton = nullptr; |
550 | |
551 | void MessageQueue::set_thread_singleton_override(CallQueue *p_thread_singleton) { |
552 | DEV_ASSERT(p_thread_singleton); // To unset the thread singleton, don't call this with nullptr, but just memfree() it. |
553 | #ifdef DEV_ENABLED |
554 | if (thread_singleton) { |
555 | thread_singleton->is_current_thread_override = false; |
556 | } |
557 | #endif |
558 | thread_singleton = p_thread_singleton; |
559 | #ifdef DEV_ENABLED |
560 | if (thread_singleton) { |
561 | thread_singleton->is_current_thread_override = true; |
562 | } |
563 | #endif |
564 | } |
565 | |
566 | MessageQueue::MessageQueue() : |
567 | CallQueue(nullptr, |
568 | int(GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb" , PROPERTY_HINT_RANGE, "1,512,1,or_greater" ), 32)) * 1024 * 1024 / PAGE_SIZE_BYTES, |
569 | "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings." ) { |
570 | ERR_FAIL_COND_MSG(main_singleton != nullptr, "A MessageQueue singleton already exists." ); |
571 | main_singleton = this; |
572 | } |
573 | |
574 | MessageQueue::~MessageQueue() { |
575 | main_singleton = nullptr; |
576 | } |
577 | |