1// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include <utility>
6
7#include "vm/message_handler.h"
8
9#include "vm/dart.h"
10#include "vm/heap/safepoint.h"
11#include "vm/isolate.h"
12#include "vm/lockers.h"
13#include "vm/object.h"
14#include "vm/object_store.h"
15#include "vm/os.h"
16#include "vm/port.h"
17#include "vm/thread_interrupter.h"
18
19namespace dart {
20
21DECLARE_FLAG(bool, trace_service_pause_events);
22
23class MessageHandlerTask : public ThreadPool::Task {
24 public:
25 explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
26 ASSERT(handler != NULL);
27 }
28
29 virtual void Run() {
30 ASSERT(handler_ != NULL);
31 handler_->TaskCallback();
32 }
33
34 private:
35 MessageHandler* handler_;
36
37 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
38};
39
40// static
41const char* MessageHandler::MessageStatusString(MessageStatus status) {
42 switch (status) {
43 case kOK:
44 return "OK";
45 case kError:
46 return "Error";
47 case kRestart:
48 return "Restart";
49 case kShutdown:
50 return "Shutdown";
51 default:
52 UNREACHABLE();
53 return "Illegal";
54 }
55}
56
57MessageHandler::MessageHandler()
58 : queue_(new MessageQueue()),
59 oob_queue_(new MessageQueue()),
60 oob_message_handling_allowed_(true),
61 paused_for_messages_(false),
62 live_ports_(0),
63 paused_(0),
64#if !defined(PRODUCT)
65 should_pause_on_start_(false),
66 should_pause_on_exit_(false),
67 is_paused_on_start_(false),
68 is_paused_on_exit_(false),
69 paused_timestamp_(-1),
70#endif
71 task_running_(false),
72 delete_me_(false),
73 pool_(NULL),
74 start_callback_(NULL),
75 end_callback_(NULL),
76 callback_data_(0) {
77 ASSERT(queue_ != NULL);
78 ASSERT(oob_queue_ != NULL);
79}
80
81MessageHandler::~MessageHandler() {
82 delete queue_;
83 delete oob_queue_;
84 queue_ = NULL;
85 oob_queue_ = NULL;
86 pool_ = NULL;
87}
88
89const char* MessageHandler::name() const {
90 return "<unnamed>";
91}
92
93#if defined(DEBUG)
94void MessageHandler::CheckAccess() {
95 // By default there is no checking.
96}
97#endif
98
99void MessageHandler::MessageNotify(Message::Priority priority) {
100 // By default, there is no custom message notification.
101}
102
103void MessageHandler::Run(ThreadPool* pool,
104 StartCallback start_callback,
105 EndCallback end_callback,
106 CallbackData data) {
107 MonitorLocker ml(&monitor_);
108 if (FLAG_trace_isolates) {
109 OS::PrintErr(
110 "[+] Starting message handler:\n"
111 "\thandler: %s\n",
112 name());
113 }
114 ASSERT(pool_ == NULL);
115 ASSERT(!delete_me_);
116 pool_ = pool;
117 start_callback_ = start_callback;
118 end_callback_ = end_callback;
119 callback_data_ = data;
120 task_running_ = true;
121 const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
122 ASSERT(launched_successfully);
123}
124
125void MessageHandler::PostMessage(std::unique_ptr<Message> message,
126 bool before_events) {
127 Message::Priority saved_priority;
128
129 {
130 MonitorLocker ml(&monitor_);
131 if (FLAG_trace_isolates) {
132 Isolate* source_isolate = Isolate::Current();
133 if (source_isolate != nullptr) {
134 OS::PrintErr(
135 "[>] Posting message:\n"
136 "\tlen: %" Pd "\n\tsource: (%" Pd64
137 ") %s\n\tdest: %s\n"
138 "\tdest_port: %" Pd64 "\n",
139 message->Size(), static_cast<int64_t>(source_isolate->main_port()),
140 source_isolate->name(), name(), message->dest_port());
141 } else {
142 OS::PrintErr(
143 "[>] Posting message:\n"
144 "\tlen: %" Pd
145 "\n\tsource: <native code>\n"
146 "\tdest: %s\n"
147 "\tdest_port: %" Pd64 "\n",
148 message->Size(), name(), message->dest_port());
149 }
150 }
151
152 saved_priority = message->priority();
153 if (message->IsOOB()) {
154 oob_queue_->Enqueue(std::move(message), before_events);
155 } else {
156 queue_->Enqueue(std::move(message), before_events);
157 }
158 if (paused_for_messages_) {
159 ml.Notify();
160 }
161
162 if (pool_ != nullptr && !task_running_) {
163 ASSERT(!delete_me_);
164 task_running_ = true;
165 const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
166 ASSERT(launched_successfully);
167 }
168 }
169
170 // Invoke any custom message notification.
171 MessageNotify(saved_priority);
172}
173
174std::unique_ptr<Message> MessageHandler::DequeueMessage(
175 Message::Priority min_priority) {
176 // TODO(turnidge): Add assert that monitor_ is held here.
177 std::unique_ptr<Message> message = oob_queue_->Dequeue();
178 if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
179 message = queue_->Dequeue();
180 }
181 return message;
182}
183
184void MessageHandler::ClearOOBQueue() {
185 oob_queue_->Clear();
186}
187
188MessageHandler::MessageStatus MessageHandler::HandleMessages(
189 MonitorLocker* ml,
190 bool allow_normal_messages,
191 bool allow_multiple_normal_messages) {
192 ASSERT(monitor_.IsOwnedByCurrentThread());
193
194 // Scheduling of the mutator thread during the isolate start can cause this
195 // thread to safepoint.
196 // We want to avoid holding the message handler monitor during the safepoint
197 // operation to avoid possible deadlocks, which can occur if other threads are
198 // sending messages to this message handler.
199 //
200 // If isolate() returns nullptr [StartIsolateScope] does nothing.
201 ml->Exit();
202 StartIsolateScope start_isolate(isolate());
203 ml->Enter();
204
205 auto idle_time_handler =
206 isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
207
208 MessageStatus max_status = kOK;
209 Message::Priority min_priority =
210 ((allow_normal_messages && !paused()) ? Message::kNormalPriority
211 : Message::kOOBPriority);
212 std::unique_ptr<Message> message = DequeueMessage(min_priority);
213 while (message != nullptr) {
214 intptr_t message_len = message->Size();
215 if (FLAG_trace_isolates) {
216 OS::PrintErr(
217 "[<] Handling message:\n"
218 "\tlen: %" Pd
219 "\n"
220 "\thandler: %s\n"
221 "\tport: %" Pd64 "\n",
222 message_len, name(), message->dest_port());
223 }
224
225 // Release the monitor_ temporarily while we handle the message.
226 // The monitor was acquired in MessageHandler::TaskCallback().
227 ml->Exit();
228 Message::Priority saved_priority = message->priority();
229 Dart_Port saved_dest_port = message->dest_port();
230 MessageStatus status = kOK;
231 {
232 DisableIdleTimerScope disable_idle_timer(idle_time_handler);
233 status = HandleMessage(std::move(message));
234 }
235 if (status > max_status) {
236 max_status = status;
237 }
238 ml->Enter();
239 if (FLAG_trace_isolates) {
240 OS::PrintErr(
241 "[.] Message handled (%s):\n"
242 "\tlen: %" Pd
243 "\n"
244 "\thandler: %s\n"
245 "\tport: %" Pd64 "\n",
246 MessageStatusString(status), message_len, name(), saved_dest_port);
247 }
248 // If we are shutting down, do not process any more messages.
249 if (status == kShutdown) {
250 ClearOOBQueue();
251 break;
252 }
253
254 // Remember time since the last message. Don't consider OOB messages so
255 // using Observatory doesn't trigger additional idle tasks.
256 if ((FLAG_idle_timeout_micros != 0) &&
257 (saved_priority == Message::kNormalPriority)) {
258 if (idle_time_handler != nullptr) {
259 idle_time_handler->UpdateStartIdleTime();
260 }
261 }
262
263 // Some callers want to process only one normal message and then quit. At
264 // the same time it is OK to process multiple OOB messages.
265 if ((saved_priority == Message::kNormalPriority) &&
266 !allow_multiple_normal_messages) {
267 // We processed one normal message. Allow no more.
268 allow_normal_messages = false;
269 }
270
271 // Reevaluate the minimum allowable priority. The paused state
272 // may have changed as part of handling the message. We may also
273 // have encountered an error during message processing.
274 //
275 // Even if we encounter an error, we still process pending OOB
276 // messages so that we don't lose the message notification.
277 min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
278 ? Message::kNormalPriority
279 : Message::kOOBPriority);
280 message = DequeueMessage(min_priority);
281 }
282 return max_status;
283}
284
285MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
286 // We can only call HandleNextMessage when this handler is not
287 // assigned to a thread pool.
288 MonitorLocker ml(&monitor_);
289 ASSERT(pool_ == NULL);
290 ASSERT(!delete_me_);
291#if defined(DEBUG)
292 CheckAccess();
293#endif
294 return HandleMessages(&ml, true, false);
295}
296
297MessageHandler::MessageStatus MessageHandler::PauseAndHandleAllMessages(
298 int64_t timeout_millis) {
299 MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
300 ASSERT(task_running_);
301 ASSERT(!delete_me_);
302#if defined(DEBUG)
303 CheckAccess();
304#endif
305 paused_for_messages_ = true;
306 while (queue_->IsEmpty() && oob_queue_->IsEmpty()) {
307 Monitor::WaitResult wr;
308 {
309 // Ensure this thread is at a safepoint while we wait for new messages to
310 // arrive.
311 TransitionVMToNative transition(Thread::Current());
312 wr = ml.Wait(timeout_millis);
313 }
314 ASSERT(task_running_);
315 ASSERT(!delete_me_);
316 if (wr == Monitor::kTimedOut) {
317 break;
318 }
319 if (queue_->IsEmpty()) {
320 // There are only OOB messages. Handle them and then continue waiting for
321 // normal messages unless there is an error.
322 MessageStatus status = HandleMessages(&ml, false, false);
323 if (status != kOK) {
324 paused_for_messages_ = false;
325 return status;
326 }
327 }
328 }
329 paused_for_messages_ = false;
330 return HandleMessages(&ml, true, true);
331}
332
333MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() {
334 if (!oob_message_handling_allowed_) {
335 return kOK;
336 }
337 MonitorLocker ml(&monitor_);
338 ASSERT(!delete_me_);
339#if defined(DEBUG)
340 CheckAccess();
341#endif
342 return HandleMessages(&ml, false, false);
343}
344
345#if !defined(PRODUCT)
346bool MessageHandler::ShouldPauseOnStart(MessageStatus status) const {
347 Isolate* owning_isolate = isolate();
348 if (owning_isolate == NULL) {
349 return false;
350 }
351 // If we are restarting or shutting down, we do not want to honor
352 // should_pause_on_start or should_pause_on_exit.
353 return (status != MessageHandler::kRestart &&
354 status != MessageHandler::kShutdown) &&
355 should_pause_on_start() && owning_isolate->is_runnable();
356}
357
358bool MessageHandler::ShouldPauseOnExit(MessageStatus status) const {
359 Isolate* owning_isolate = isolate();
360 if (owning_isolate == NULL) {
361 return false;
362 }
363 return (status != MessageHandler::kRestart &&
364 status != MessageHandler::kShutdown) &&
365 should_pause_on_exit() && owning_isolate->is_runnable();
366}
367#endif
368
369bool MessageHandler::HasOOBMessages() {
370 MonitorLocker ml(&monitor_);
371 return !oob_queue_->IsEmpty();
372}
373
374bool MessageHandler::HasMessages() {
375 MonitorLocker ml(&monitor_);
376 return !queue_->IsEmpty();
377}
378
379void MessageHandler::TaskCallback() {
380 ASSERT(Isolate::Current() == NULL);
381 MessageStatus status = kOK;
382 bool run_end_callback = false;
383 bool delete_me = false;
384 EndCallback end_callback = NULL;
385 CallbackData callback_data = 0;
386 {
387 // We will occasionally release and reacquire this monitor in this
388 // function. Whenever we reacquire the monitor we *must* process
389 // all pending OOB messages, or we may miss a request for vm
390 // shutdown.
391 MonitorLocker ml(&monitor_);
392
393 // This method is running on the message handler task. Which means no
394 // other message handler tasks will be started until this one sets
395 // [task_running_] to false.
396 ASSERT(task_running_);
397
398#if !defined(PRODUCT)
399 if (ShouldPauseOnStart(kOK)) {
400 if (!is_paused_on_start()) {
401 PausedOnStartLocked(&ml, true);
402 }
403 // More messages may have come in before we (re)acquired the monitor.
404 status = HandleMessages(&ml, false, false);
405 if (ShouldPauseOnStart(status)) {
406 // Still paused.
407 ASSERT(oob_queue_->IsEmpty());
408 task_running_ = false; // No task in queue.
409 return;
410 } else {
411 PausedOnStartLocked(&ml, false);
412 }
413 }
414 if (is_paused_on_exit()) {
415 status = HandleMessages(&ml, false, false);
416 if (ShouldPauseOnExit(status)) {
417 // Still paused.
418 ASSERT(oob_queue_->IsEmpty());
419 task_running_ = false; // No task in queue.
420 return;
421 } else {
422 PausedOnExitLocked(&ml, false);
423 }
424 }
425#endif // !defined(PRODUCT)
426
427 if (status == kOK) {
428 if (start_callback_ != nullptr) {
429 // Initialize the message handler by running its start function,
430 // if we have one. For an isolate, this will run the isolate's
431 // main() function.
432 //
433 // Release the monitor_ temporarily while we call the start callback.
434 ml.Exit();
435 status = start_callback_(callback_data_);
436 ASSERT(Isolate::Current() == NULL);
437 start_callback_ = NULL;
438 ml.Enter();
439 }
440
441 // Handle any pending messages for this message handler.
442 if (status != kShutdown) {
443 status = HandleMessages(&ml, (status == kOK), true);
444 }
445 }
446
447 // The isolate exits when it encounters an error or when it no
448 // longer has live ports.
449 if (status != kOK || !HasLivePorts()) {
450#if !defined(PRODUCT)
451 if (ShouldPauseOnExit(status)) {
452 if (FLAG_trace_service_pause_events) {
453 OS::PrintErr(
454 "Isolate %s paused before exiting. "
455 "Use the Observatory to release it.\n",
456 name());
457 }
458 PausedOnExitLocked(&ml, true);
459 // More messages may have come in while we released the monitor.
460 status = HandleMessages(&ml, false, false);
461 if (ShouldPauseOnExit(status)) {
462 // Still paused.
463 ASSERT(oob_queue_->IsEmpty());
464 task_running_ = false; // No task in queue.
465 return;
466 } else {
467 PausedOnExitLocked(&ml, false);
468 }
469 }
470#endif // !defined(PRODUCT)
471 if (FLAG_trace_isolates) {
472 if (status != kOK && thread() != NULL) {
473 const Error& error = Error::Handle(thread()->sticky_error());
474 OS::PrintErr(
475 "[-] Stopping message handler (%s):\n"
476 "\thandler: %s\n"
477 "\terror: %s\n",
478 MessageStatusString(status), name(), error.ToCString());
479 } else {
480 OS::PrintErr(
481 "[-] Stopping message handler (%s):\n"
482 "\thandler: %s\n",
483 MessageStatusString(status), name());
484 }
485 }
486 pool_ = NULL;
487 // Decide if we have a callback before releasing the monitor.
488 end_callback = end_callback_;
489 callback_data = callback_data_;
490 run_end_callback = end_callback_ != NULL;
491 delete_me = delete_me_;
492 }
493
494 // Clear task_running_ last. This allows other tasks to potentially start
495 // for this message handler.
496 ASSERT(oob_queue_->IsEmpty());
497 task_running_ = false;
498 }
499
500 // The handler may have been deleted by another thread here if it is a native
501 // message handler.
502
503 // Message handlers either use delete_me or end_callback but not both.
504 ASSERT(!delete_me || !run_end_callback);
505
506 if (run_end_callback) {
507 ASSERT(end_callback != NULL);
508 end_callback(callback_data);
509 // The handler may have been deleted after this point.
510 }
511 if (delete_me) {
512 delete this;
513 }
514}
515
516void MessageHandler::ClosePort(Dart_Port port) {
517 MonitorLocker ml(&monitor_);
518 if (FLAG_trace_isolates) {
519 OS::PrintErr(
520 "[-] Closing port:\n"
521 "\thandler: %s\n"
522 "\tport: %" Pd64
523 "\n"
524 "\tports: live(%" Pd ")\n",
525 name(), port, live_ports_);
526 }
527}
528
529void MessageHandler::CloseAllPorts() {
530 MonitorLocker ml(&monitor_);
531 if (FLAG_trace_isolates) {
532 OS::PrintErr(
533 "[-] Closing all ports:\n"
534 "\thandler: %s\n",
535 name());
536 }
537 queue_->Clear();
538 oob_queue_->Clear();
539}
540
541void MessageHandler::RequestDeletion() {
542 ASSERT(OwnedByPortMap());
543 {
544 MonitorLocker ml(&monitor_);
545 if (task_running_) {
546 // This message handler currently has a task running on the thread pool.
547 delete_me_ = true;
548 return;
549 }
550 }
551
552 // This message handler has no current task. Delete it.
553 delete this;
554}
555
556void MessageHandler::increment_live_ports() {
557 MonitorLocker ml(&monitor_);
558#if defined(DEBUG)
559 CheckAccess();
560#endif
561 live_ports_++;
562}
563
564void MessageHandler::decrement_live_ports() {
565 MonitorLocker ml(&monitor_);
566#if defined(DEBUG)
567 CheckAccess();
568#endif
569 live_ports_--;
570}
571
572#if !defined(PRODUCT)
573void MessageHandler::DebugDump() {
574 PortMap::DebugDumpForMessageHandler(this);
575}
576
577void MessageHandler::PausedOnStart(bool paused) {
578 MonitorLocker ml(&monitor_);
579 PausedOnStartLocked(&ml, paused);
580}
581
582void MessageHandler::PausedOnStartLocked(MonitorLocker* ml, bool paused) {
583 if (paused) {
584 ASSERT(!is_paused_on_start_);
585 ASSERT(paused_timestamp_ == -1);
586 paused_timestamp_ = OS::GetCurrentTimeMillis();
587 // Temporarily release the monitor when calling out to
588 // NotifyPauseOnStart. This avoids a dead lock that can occur
589 // when this message handler tries to post a message while a
590 // message is being posted to it.
591 ml->Exit();
592 NotifyPauseOnStart();
593 ml->Enter();
594 is_paused_on_start_ = true;
595 } else {
596 ASSERT(is_paused_on_start_);
597 ASSERT(paused_timestamp_ != -1);
598 paused_timestamp_ = -1;
599 // Resumed. Clear the resume request of the owning isolate.
600 Isolate* owning_isolate = isolate();
601 if (owning_isolate != NULL) {
602 owning_isolate->GetAndClearResumeRequest();
603 }
604 is_paused_on_start_ = false;
605 }
606}
607
608void MessageHandler::PausedOnExit(bool paused) {
609 MonitorLocker ml(&monitor_);
610 PausedOnExitLocked(&ml, paused);
611}
612
613void MessageHandler::PausedOnExitLocked(MonitorLocker* ml, bool paused) {
614 if (paused) {
615 ASSERT(!is_paused_on_exit_);
616 ASSERT(paused_timestamp_ == -1);
617 paused_timestamp_ = OS::GetCurrentTimeMillis();
618 // Temporarily release the monitor when calling out to
619 // NotifyPauseOnExit. This avoids a dead lock that can
620 // occur when this message handler tries to post a message
621 // while a message is being posted to it.
622 ml->Exit();
623 NotifyPauseOnExit();
624 ml->Enter();
625 is_paused_on_exit_ = true;
626 } else {
627 ASSERT(is_paused_on_exit_);
628 ASSERT(paused_timestamp_ != -1);
629 paused_timestamp_ = -1;
630 // Resumed. Clear the resume request of the owning isolate.
631 Isolate* owning_isolate = isolate();
632 if (owning_isolate != NULL) {
633 owning_isolate->GetAndClearResumeRequest();
634 }
635 is_paused_on_exit_ = false;
636 }
637}
638#endif // !defined(PRODUCT)
639
640MessageHandler::AcquiredQueues::AcquiredQueues(MessageHandler* handler)
641 : handler_(handler), ml_(&handler->monitor_) {
642 ASSERT(handler != NULL);
643 handler_->oob_message_handling_allowed_ = false;
644}
645
646MessageHandler::AcquiredQueues::~AcquiredQueues() {
647 ASSERT(handler_ != NULL);
648 handler_->oob_message_handling_allowed_ = true;
649}
650
651} // namespace dart
652