1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include <utility>
6
7#include "vm/message_handler.h"
8#include "vm/port.h"
9#include "vm/unit_test.h"
10
11namespace dart {
12
13class MessageHandlerTestPeer {
14 public:
15 explicit MessageHandlerTestPeer(MessageHandler* handler)
16 : handler_(handler) {}
17
18 void PostMessage(std::unique_ptr<Message> message) {
19 handler_->PostMessage(std::move(message));
20 }
21 void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
22 void CloseAllPorts() { handler_->CloseAllPorts(); }
23
24 void increment_live_ports() { handler_->increment_live_ports(); }
25 void decrement_live_ports() { handler_->decrement_live_ports(); }
26
27 MessageQueue* queue() const { return handler_->queue_; }
28 MessageQueue* oob_queue() const { return handler_->oob_queue_; }
29
30 private:
31 MessageHandler* handler_;
32
33 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer);
34};
35
36class TestMessageHandler : public MessageHandler {
37 public:
38 TestMessageHandler()
39 : port_buffer_(NULL),
40 port_buffer_size_(0),
41 notify_count_(0),
42 message_count_(0),
43 start_called_(false),
44 end_called_(false),
45 results_(NULL),
46 monitor_() {}
47
48 ~TestMessageHandler() {
49 PortMap::ClosePorts(this);
50 delete[] port_buffer_;
51 }
52
53 void MessageNotify(Message::Priority priority) {
54 MonitorLocker ml(&monitor_);
55 notify_count_++;
56 ml.Notify();
57 }
58
59 MessageStatus HandleMessage(std::unique_ptr<Message> message) {
60 // For testing purposes, keep a list of the ports
61 // for all messages we receive.
62 MonitorLocker ml(&monitor_);
63 AddPortToBuffer(message->dest_port());
64 message_count_++;
65 MessageStatus status = kOK;
66 if (results_ != NULL) {
67 status = results_[0];
68 results_++;
69 }
70 ml.Notify();
71 return status;
72 }
73
74 MessageStatus Start() {
75 start_called_ = true;
76 return kOK;
77 }
78
79 void End() {
80 MonitorLocker ml(&monitor_);
81 end_called_ = true;
82 AddPortToBuffer(-2);
83 ml.Notify();
84 }
85
86 Dart_Port* port_buffer() const { return port_buffer_; }
87 int notify_count() const { return notify_count_; }
88 int message_count() const { return message_count_; }
89 bool start_called() const { return start_called_; }
90 bool end_called() const { return end_called_; }
91
92 void set_results(MessageStatus* results) { results_ = results; }
93
94 Monitor* monitor() { return &monitor_; }
95
96 private:
97 void AddPortToBuffer(Dart_Port port) {
98 if (port_buffer_ == NULL) {
99 port_buffer_ = new Dart_Port[10];
100 port_buffer_size_ = 10;
101 } else if (message_count_ == port_buffer_size_) {
102 int new_port_buffer_size_ = 2 * port_buffer_size_;
103 Dart_Port* new_port_buffer_ = new Dart_Port[new_port_buffer_size_];
104 for (int i = 0; i < port_buffer_size_; i++) {
105 new_port_buffer_[i] = port_buffer_[i];
106 }
107 delete[] port_buffer_;
108 port_buffer_ = new_port_buffer_;
109 port_buffer_size_ = new_port_buffer_size_;
110 }
111 port_buffer_[message_count_] = port;
112 }
113
114 Dart_Port* port_buffer_;
115 int port_buffer_size_;
116 int notify_count_;
117 int message_count_;
118 bool start_called_;
119 bool end_called_;
120 MessageStatus* results_;
121 Monitor monitor_;
122
123 DISALLOW_COPY_AND_ASSIGN(TestMessageHandler);
124};
125
126MessageHandler::MessageStatus TestStartFunction(uword data) {
127 return (reinterpret_cast<TestMessageHandler*>(data))->Start();
128}
129
130void TestEndFunction(uword data) {
131 return (reinterpret_cast<TestMessageHandler*>(data))->End();
132}
133
134static std::unique_ptr<Message> BlankMessage(Dart_Port dest,
135 Message::Priority priority) {
136 return Message::New(dest, reinterpret_cast<uint8_t*>(malloc(1)), 1, nullptr,
137 priority);
138}
139
140VM_UNIT_TEST_CASE(MessageHandler_PostMessage) {
141 TestMessageHandler handler;
142 MessageHandlerTestPeer handler_peer(&handler);
143 EXPECT_EQ(0, handler.notify_count());
144
145 // Post a message.
146 std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
147 Message* raw_message = message.get();
148 handler_peer.PostMessage(std::move(message));
149
150 // The notify callback is called.
151 EXPECT_EQ(1, handler.notify_count());
152
153 // The message has been added to the correct queue.
154 EXPECT(raw_message == handler_peer.queue()->Dequeue().get());
155 EXPECT(nullptr == handler_peer.oob_queue()->Dequeue());
156
157 // Post an oob message.
158 message = BlankMessage(1, Message::kOOBPriority);
159 raw_message = message.get();
160 handler_peer.PostMessage(std::move(message));
161
162 // The notify callback is called.
163 EXPECT_EQ(2, handler.notify_count());
164
165 // The message has been added to the correct queue.
166 EXPECT(raw_message == handler_peer.oob_queue()->Dequeue().get());
167 EXPECT(nullptr == handler_peer.queue()->Dequeue());
168}
169
170VM_UNIT_TEST_CASE(MessageHandler_HasOOBMessages) {
171 TestMessageHandler handler;
172 MessageHandlerTestPeer handler_peer(&handler);
173
174 EXPECT(!handler.HasOOBMessages());
175
176 // Post a normal message.
177 std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
178 handler_peer.PostMessage(std::move(message));
179 EXPECT(!handler.HasOOBMessages());
180 {
181 // Acquire ownership of message handler queues, verify one regular message.
182 MessageHandler::AcquiredQueues aq(&handler);
183 EXPECT(aq.queue()->Length() == 1);
184 }
185
186 // Post an oob message.
187 message = BlankMessage(1, Message::kOOBPriority);
188 handler_peer.PostMessage(std::move(message));
189 EXPECT(handler.HasOOBMessages());
190 {
191 // Acquire ownership of message handler queues, verify one regular and one
192 // OOB message.
193 MessageHandler::AcquiredQueues aq(&handler);
194 EXPECT(aq.queue()->Length() == 1);
195 EXPECT(aq.oob_queue()->Length() == 1);
196 }
197
198 // Delete all pending messages.
199 handler_peer.CloseAllPorts();
200}
201
202VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
203 TestMessageHandler handler;
204 MessageHandlerTestPeer handler_peer(&handler);
205 std::unique_ptr<Message> message;
206 message = BlankMessage(1, Message::kNormalPriority);
207 Message* raw_message1 = message.get();
208 handler_peer.PostMessage(std::move(message));
209 message = BlankMessage(2, Message::kNormalPriority);
210 Message* raw_message2 = message.get();
211 handler_peer.PostMessage(std::move(message));
212
213 handler_peer.ClosePort(1);
214
215 // Closing the port does not drop the messages from the queue.
216 EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
217 EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
218}
219
220VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
221 TestMessageHandler handler;
222 MessageHandlerTestPeer handler_peer(&handler);
223 handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority));
224 handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority));
225
226 handler_peer.CloseAllPorts();
227
228 // All messages are dropped from the queue.
229 EXPECT(nullptr == handler_peer.queue()->Dequeue());
230}
231
232VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage) {
233 TestMessageHandler handler;
234 MessageHandlerTestPeer handler_peer(&handler);
235 Dart_Port port1 = PortMap::CreatePort(&handler);
236 Dart_Port port2 = PortMap::CreatePort(&handler);
237 Dart_Port port3 = PortMap::CreatePort(&handler);
238 handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
239 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
240 handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
241 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
242
243 // We handle both oob messages and a single normal message.
244 EXPECT_EQ(MessageHandler::kOK, handler.HandleNextMessage());
245 EXPECT_EQ(3, handler.message_count());
246 Dart_Port* ports = handler.port_buffer();
247 EXPECT_EQ(port2, ports[0]);
248 EXPECT_EQ(port3, ports[1]);
249 EXPECT_EQ(port1, ports[2]);
250}
251
252VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_ProcessOOBAfterError) {
253 TestMessageHandler handler;
254 MessageHandler::MessageStatus results[] = {
255 MessageHandler::kError, // oob_message1
256 MessageHandler::kOK, // oob_message2
257 MessageHandler::kOK, // unused
258 };
259 handler.set_results(results);
260 MessageHandlerTestPeer handler_peer(&handler);
261 Dart_Port port1 = PortMap::CreatePort(&handler);
262 Dart_Port port2 = PortMap::CreatePort(&handler);
263 Dart_Port port3 = PortMap::CreatePort(&handler);
264 handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
265 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
266 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
267
268 // When we get an error, we continue processing oob messages but
269 // stop handling normal messages.
270 EXPECT_EQ(MessageHandler::kError, handler.HandleNextMessage());
271 EXPECT_EQ(2, handler.message_count());
272 Dart_Port* ports = handler.port_buffer();
273 EXPECT_EQ(port2, ports[0]); // oob_message1, error
274 EXPECT_EQ(port3, ports[1]); // oob_message2, ok
275 handler_peer.CloseAllPorts();
276}
277
278VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
279 TestMessageHandler handler;
280 MessageHandler::MessageStatus results[] = {
281 MessageHandler::kOK, // oob_message1
282 MessageHandler::kShutdown, // oob_message2
283 MessageHandler::kOK, // unused
284 MessageHandler::kOK, // unused
285 };
286 handler.set_results(results);
287 MessageHandlerTestPeer handler_peer(&handler);
288 Dart_Port port1 = PortMap::CreatePort(&handler);
289 Dart_Port port2 = PortMap::CreatePort(&handler);
290 Dart_Port port3 = PortMap::CreatePort(&handler);
291 Dart_Port port4 = PortMap::CreatePort(&handler);
292 handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
293 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
294 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
295 handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
296
297 // When we get a shutdown message, we stop processing all messages.
298 EXPECT_EQ(MessageHandler::kShutdown, handler.HandleNextMessage());
299 EXPECT_EQ(2, handler.message_count());
300 Dart_Port* ports = handler.port_buffer();
301 EXPECT_EQ(port2, ports[0]); // oob_message1, ok
302 EXPECT_EQ(port3, ports[1]); // oob_message2, shutdown
303 {
304 // The oob queue has been cleared. oob_message3 is gone.
305 MessageHandler::AcquiredQueues aq(&handler);
306 EXPECT(aq.oob_queue()->Length() == 0);
307 }
308 handler_peer.CloseAllPorts();
309}
310
311VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
312 TestMessageHandler handler;
313 MessageHandlerTestPeer handler_peer(&handler);
314 Dart_Port port1 = PortMap::CreatePort(&handler);
315 Dart_Port port2 = PortMap::CreatePort(&handler);
316 Dart_Port port3 = PortMap::CreatePort(&handler);
317 Dart_Port port4 = PortMap::CreatePort(&handler);
318 handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
319 handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
320 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
321 handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
322
323 // We handle both oob messages but no normal messages.
324 EXPECT_EQ(MessageHandler::kOK, handler.HandleOOBMessages());
325 EXPECT_EQ(2, handler.message_count());
326 Dart_Port* ports = handler.port_buffer();
327 EXPECT_EQ(port3, ports[0]);
328 EXPECT_EQ(port4, ports[1]);
329 handler_peer.CloseAllPorts();
330}
331
332struct ThreadStartInfo {
333 MessageHandler* handler;
334 Dart_Port* ports;
335 int count;
336 ThreadJoinId join_id;
337};
338
339static void SendMessages(uword param) {
340 ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param);
341 info->join_id = OSThread::GetCurrentThreadJoinId(OSThread::Current());
342 MessageHandler* handler = info->handler;
343 MessageHandlerTestPeer handler_peer(handler);
344 for (int i = 0; i < info->count; i++) {
345 handler_peer.PostMessage(
346 BlankMessage(info->ports[i], Message::kNormalPriority));
347 }
348}
349
350VM_UNIT_TEST_CASE(MessageHandler_Run) {
351 TestMessageHandler handler;
352 ThreadPool pool;
353 MessageHandlerTestPeer handler_peer(&handler);
354
355 EXPECT(!handler.HasLivePorts());
356 handler_peer.increment_live_ports();
357
358 handler.Run(&pool, TestStartFunction, TestEndFunction,
359 reinterpret_cast<uword>(&handler));
360 Dart_Port port = PortMap::CreatePort(&handler);
361 handler_peer.PostMessage(BlankMessage(port, Message::kNormalPriority));
362
363 // Wait for the first message to be handled.
364 {
365 MonitorLocker ml(handler.monitor());
366 while (handler.message_count() < 1) {
367 ml.Wait();
368 }
369 EXPECT_EQ(1, handler.message_count());
370 EXPECT(handler.start_called());
371 EXPECT(!handler.end_called());
372 Dart_Port* handler_ports = handler.port_buffer();
373 EXPECT_EQ(port, handler_ports[0]);
374 }
375
376 // Start a thread which sends more messages.
377 Dart_Port ports[10];
378 for (int i = 0; i < 10; i++) {
379 ports[i] = PortMap::CreatePort(&handler);
380 }
381 ThreadStartInfo info;
382 info.handler = &handler;
383 info.ports = ports;
384 info.count = 10;
385 info.join_id = OSThread::kInvalidThreadJoinId;
386 OSThread::Start("SendMessages", SendMessages, reinterpret_cast<uword>(&info));
387
388 // Wait for the messages to be handled.
389 {
390 MonitorLocker ml(handler.monitor());
391 while (handler.message_count() < 11) {
392 ml.Wait();
393 }
394 Dart_Port* handler_ports = handler.port_buffer();
395 EXPECT_EQ(11, handler.message_count());
396 EXPECT(handler.start_called());
397 EXPECT(!handler.end_called());
398 EXPECT_EQ(port, handler_ports[0]);
399 for (int i = 1; i < 11; i++) {
400 EXPECT_EQ(ports[i - 1], handler_ports[i]);
401 }
402 handler_peer.decrement_live_ports();
403 EXPECT(!handler.HasLivePorts());
404 }
405
406 // Must join the thread or the VM shutdown is racing with any VM state the
407 // thread touched.
408 ASSERT(info.join_id != OSThread::kInvalidThreadJoinId);
409 OSThread::Join(info.join_id);
410}
411
412} // namespace dart
413