1// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "vm/message.h"
6
7#include <utility>
8
9#include "vm/dart_entry.h"
10#include "vm/json_stream.h"
11#include "vm/object.h"
12#include "vm/port.h"
13
14namespace dart {
15
16const Dart_Port Message::kIllegalPort = 0;
17
18Message::Message(Dart_Port dest_port,
19 uint8_t* snapshot,
20 intptr_t snapshot_length,
21 MessageFinalizableData* finalizable_data,
22 Priority priority,
23 Dart_Port delivery_failure_port)
24 : next_(NULL),
25 dest_port_(dest_port),
26 delivery_failure_port_(delivery_failure_port),
27 payload_(snapshot),
28 snapshot_length_(snapshot_length),
29 finalizable_data_(finalizable_data),
30 priority_(priority) {
31 ASSERT((priority == kNormalPriority) ||
32 (delivery_failure_port == kIllegalPort));
33 ASSERT(IsSnapshot());
34}
35
36Message::Message(Dart_Port dest_port,
37 ObjectPtr raw_obj,
38 Priority priority,
39 Dart_Port delivery_failure_port)
40 : next_(NULL),
41 dest_port_(dest_port),
42 delivery_failure_port_(delivery_failure_port),
43 payload_(raw_obj),
44 snapshot_length_(0),
45 finalizable_data_(NULL),
46 priority_(priority) {
47 ASSERT(!raw_obj->IsHeapObject() || raw_obj->ptr()->InVMIsolateHeap());
48 ASSERT((priority == kNormalPriority) ||
49 (delivery_failure_port == kIllegalPort));
50 ASSERT(IsRaw());
51}
52
53Message::Message(Dart_Port dest_port,
54 Bequest* bequest,
55 Priority priority,
56 Dart_Port delivery_failure_port)
57 : next_(nullptr),
58 dest_port_(dest_port),
59 delivery_failure_port_(delivery_failure_port),
60 payload_(bequest),
61 snapshot_length_(-1),
62 finalizable_data_(nullptr),
63 priority_(priority) {
64 ASSERT((priority == kNormalPriority) ||
65 (delivery_failure_port == kIllegalPort));
66 ASSERT(IsBequest());
67}
68
69Message::~Message() {
70 ASSERT(delivery_failure_port_ == kIllegalPort);
71 if (IsSnapshot()) {
72 free(payload_.snapshot_);
73 }
74 delete finalizable_data_;
75 if (IsBequest()) {
76 delete (payload_.bequest_);
77 }
78}
79
80bool Message::RedirectToDeliveryFailurePort() {
81 if (delivery_failure_port_ == kIllegalPort) {
82 return false;
83 }
84 dest_port_ = delivery_failure_port_;
85 delivery_failure_port_ = kIllegalPort;
86 return true;
87}
88
89intptr_t Message::Id() const {
90 // Messages are allocated on the C heap. Use the raw address as the id.
91 return reinterpret_cast<intptr_t>(this);
92}
93
94const char* Message::PriorityAsString(Priority priority) {
95 switch (priority) {
96 case kNormalPriority:
97 return "Normal";
98 break;
99 case kOOBPriority:
100 return "OOB";
101 break;
102 default:
103 UNIMPLEMENTED();
104 return NULL;
105 }
106}
107
108MessageQueue::MessageQueue() {
109 head_ = NULL;
110 tail_ = NULL;
111}
112
113MessageQueue::~MessageQueue() {
114 // Ensure that all pending messages have been released.
115 Clear();
116 ASSERT(head_ == NULL);
117}
118
119void MessageQueue::Enqueue(std::unique_ptr<Message> msg0, bool before_events) {
120 // TODO(mdempsky): Use unique_ptr internally?
121 Message* msg = msg0.release();
122
123 // Make sure messages are not reused.
124 ASSERT(msg->next_ == NULL);
125 if (head_ == NULL) {
126 // Only element in the queue.
127 ASSERT(tail_ == NULL);
128 head_ = msg;
129 tail_ = msg;
130 } else {
131 ASSERT(tail_ != NULL);
132 if (!before_events) {
133 // Append at the tail.
134 tail_->next_ = msg;
135 tail_ = msg;
136 } else {
137 ASSERT(msg->dest_port() == Message::kIllegalPort);
138 if (head_->dest_port() != Message::kIllegalPort) {
139 msg->next_ = head_;
140 head_ = msg;
141 } else {
142 Message* cur = head_;
143 while (cur->next_ != NULL) {
144 if (cur->next_->dest_port() != Message::kIllegalPort) {
145 // Splice in the new message at the break.
146 msg->next_ = cur->next_;
147 cur->next_ = msg;
148 return;
149 }
150 cur = cur->next_;
151 }
152 // All pending messages are isolate library control messages. Append at
153 // the tail.
154 ASSERT(tail_ == cur);
155 ASSERT(tail_->dest_port() == Message::kIllegalPort);
156 tail_->next_ = msg;
157 tail_ = msg;
158 }
159 }
160 }
161}
162
163std::unique_ptr<Message> MessageQueue::Dequeue() {
164 Message* result = head_;
165 if (result != nullptr) {
166 head_ = result->next_;
167 // The following update to tail_ is not strictly needed.
168 if (head_ == nullptr) {
169 tail_ = nullptr;
170 }
171#if defined(DEBUG)
172 result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
173#endif // DEBUG
174 return std::unique_ptr<Message>(result);
175 }
176 return nullptr;
177}
178
179void MessageQueue::Clear() {
180 std::unique_ptr<Message> cur(head_);
181 head_ = nullptr;
182 tail_ = nullptr;
183 while (cur != nullptr) {
184 std::unique_ptr<Message> next(cur->next_);
185 if (cur->RedirectToDeliveryFailurePort()) {
186 PortMap::PostMessage(std::move(cur));
187 }
188 cur = std::move(next);
189 }
190}
191
192MessageQueue::Iterator::Iterator(const MessageQueue* queue) : next_(NULL) {
193 Reset(queue);
194}
195
196MessageQueue::Iterator::~Iterator() {}
197
198void MessageQueue::Iterator::Reset(const MessageQueue* queue) {
199 ASSERT(queue != NULL);
200 next_ = queue->head_;
201}
202
203// returns false when there are no more messages left.
204bool MessageQueue::Iterator::HasNext() {
205 return next_ != NULL;
206}
207
208// Returns the current message and moves forward.
209Message* MessageQueue::Iterator::Next() {
210 Message* current = next_;
211 next_ = next_->next_;
212 return current;
213}
214
215intptr_t MessageQueue::Length() const {
216 MessageQueue::Iterator it(this);
217 intptr_t length = 0;
218 while (it.HasNext()) {
219 it.Next();
220 length++;
221 }
222 return length;
223}
224
225Message* MessageQueue::FindMessageById(intptr_t id) {
226 MessageQueue::Iterator it(this);
227 while (it.HasNext()) {
228 Message* current = it.Next();
229 ASSERT(current != NULL);
230 if (current->Id() == id) {
231 return current;
232 }
233 }
234 return NULL;
235}
236
237void MessageQueue::PrintJSON(JSONStream* stream) {
238#ifndef PRODUCT
239 JSONArray messages(stream);
240
241 Object& msg_handler = Object::Handle();
242
243 MessageQueue::Iterator it(this);
244 intptr_t depth = 0;
245 while (it.HasNext()) {
246 Message* current = it.Next();
247 JSONObject message(&messages);
248 message.AddProperty("type", "Message");
249 message.AddPropertyF("name", "Isolate Message (%" Px ")", current->Id());
250 message.AddPropertyF("messageObjectId", "messages/%" Px "", current->Id());
251 message.AddProperty("size", current->Size());
252 message.AddProperty("index", depth++);
253 message.AddPropertyF("_destinationPort", "%" Pd64 "",
254 static_cast<int64_t>(current->dest_port()));
255 message.AddProperty("_priority",
256 Message::PriorityAsString(current->priority()));
257 // TODO(johnmccutchan): Move port -> handler map out of Dart and into the
258 // VM, that way we can lookup the handler without invoking Dart code.
259 msg_handler = DartLibraryCalls::LookupHandler(current->dest_port());
260 if (msg_handler.IsClosure()) {
261 // Grab function from closure.
262 msg_handler = Closure::Cast(msg_handler).function();
263 }
264 if (msg_handler.IsFunction()) {
265 const Function& function = Function::Cast(msg_handler);
266 message.AddProperty("handler", function);
267
268 const Script& script = Script::Handle(function.script());
269 if (!script.IsNull()) {
270 message.AddLocation(script, function.token_pos(),
271 function.end_token_pos());
272 }
273 }
274 }
275#endif // !PRODUCT
276}
277
278} // namespace dart
279