1 | // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 | // for details. All rights reserved. Use of this source code is governed by a |
3 | // BSD-style license that can be found in the LICENSE file. |
4 | |
5 | #include <utility> |
6 | |
7 | #include "vm/message_handler.h" |
8 | #include "vm/port.h" |
9 | #include "vm/unit_test.h" |
10 | |
11 | namespace dart { |
12 | |
13 | class MessageHandlerTestPeer { |
14 | public: |
15 | explicit MessageHandlerTestPeer(MessageHandler* handler) |
16 | : handler_(handler) {} |
17 | |
18 | void PostMessage(std::unique_ptr<Message> message) { |
19 | handler_->PostMessage(std::move(message)); |
20 | } |
21 | void ClosePort(Dart_Port port) { handler_->ClosePort(port); } |
22 | void CloseAllPorts() { handler_->CloseAllPorts(); } |
23 | |
24 | void increment_live_ports() { handler_->increment_live_ports(); } |
25 | void decrement_live_ports() { handler_->decrement_live_ports(); } |
26 | |
27 | MessageQueue* queue() const { return handler_->queue_; } |
28 | MessageQueue* oob_queue() const { return handler_->oob_queue_; } |
29 | |
30 | private: |
31 | MessageHandler* handler_; |
32 | |
33 | DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer); |
34 | }; |
35 | |
36 | class TestMessageHandler : public MessageHandler { |
37 | public: |
38 | TestMessageHandler() |
39 | : port_buffer_(NULL), |
40 | port_buffer_size_(0), |
41 | notify_count_(0), |
42 | message_count_(0), |
43 | start_called_(false), |
44 | end_called_(false), |
45 | results_(NULL), |
46 | monitor_() {} |
47 | |
48 | ~TestMessageHandler() { |
49 | PortMap::ClosePorts(this); |
50 | delete[] port_buffer_; |
51 | } |
52 | |
53 | void MessageNotify(Message::Priority priority) { |
54 | MonitorLocker ml(&monitor_); |
55 | notify_count_++; |
56 | ml.Notify(); |
57 | } |
58 | |
59 | MessageStatus HandleMessage(std::unique_ptr<Message> message) { |
60 | // For testing purposes, keep a list of the ports |
61 | // for all messages we receive. |
62 | MonitorLocker ml(&monitor_); |
63 | AddPortToBuffer(message->dest_port()); |
64 | message_count_++; |
65 | MessageStatus status = kOK; |
66 | if (results_ != NULL) { |
67 | status = results_[0]; |
68 | results_++; |
69 | } |
70 | ml.Notify(); |
71 | return status; |
72 | } |
73 | |
74 | MessageStatus Start() { |
75 | start_called_ = true; |
76 | return kOK; |
77 | } |
78 | |
79 | void End() { |
80 | MonitorLocker ml(&monitor_); |
81 | end_called_ = true; |
82 | AddPortToBuffer(-2); |
83 | ml.Notify(); |
84 | } |
85 | |
86 | Dart_Port* port_buffer() const { return port_buffer_; } |
87 | int notify_count() const { return notify_count_; } |
88 | int message_count() const { return message_count_; } |
89 | bool start_called() const { return start_called_; } |
90 | bool end_called() const { return end_called_; } |
91 | |
92 | void set_results(MessageStatus* results) { results_ = results; } |
93 | |
94 | Monitor* monitor() { return &monitor_; } |
95 | |
96 | private: |
97 | void AddPortToBuffer(Dart_Port port) { |
98 | if (port_buffer_ == NULL) { |
99 | port_buffer_ = new Dart_Port[10]; |
100 | port_buffer_size_ = 10; |
101 | } else if (message_count_ == port_buffer_size_) { |
102 | int new_port_buffer_size_ = 2 * port_buffer_size_; |
103 | Dart_Port* new_port_buffer_ = new Dart_Port[new_port_buffer_size_]; |
104 | for (int i = 0; i < port_buffer_size_; i++) { |
105 | new_port_buffer_[i] = port_buffer_[i]; |
106 | } |
107 | delete[] port_buffer_; |
108 | port_buffer_ = new_port_buffer_; |
109 | port_buffer_size_ = new_port_buffer_size_; |
110 | } |
111 | port_buffer_[message_count_] = port; |
112 | } |
113 | |
114 | Dart_Port* port_buffer_; |
115 | int port_buffer_size_; |
116 | int notify_count_; |
117 | int message_count_; |
118 | bool start_called_; |
119 | bool end_called_; |
120 | MessageStatus* results_; |
121 | Monitor monitor_; |
122 | |
123 | DISALLOW_COPY_AND_ASSIGN(TestMessageHandler); |
124 | }; |
125 | |
126 | MessageHandler::MessageStatus TestStartFunction(uword data) { |
127 | return (reinterpret_cast<TestMessageHandler*>(data))->Start(); |
128 | } |
129 | |
130 | void TestEndFunction(uword data) { |
131 | return (reinterpret_cast<TestMessageHandler*>(data))->End(); |
132 | } |
133 | |
134 | static std::unique_ptr<Message> BlankMessage(Dart_Port dest, |
135 | Message::Priority priority) { |
136 | return Message::New(dest, reinterpret_cast<uint8_t*>(malloc(1)), 1, nullptr, |
137 | priority); |
138 | } |
139 | |
140 | VM_UNIT_TEST_CASE(MessageHandler_PostMessage) { |
141 | TestMessageHandler handler; |
142 | MessageHandlerTestPeer handler_peer(&handler); |
143 | EXPECT_EQ(0, handler.notify_count()); |
144 | |
145 | // Post a message. |
146 | std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority); |
147 | Message* raw_message = message.get(); |
148 | handler_peer.PostMessage(std::move(message)); |
149 | |
150 | // The notify callback is called. |
151 | EXPECT_EQ(1, handler.notify_count()); |
152 | |
153 | // The message has been added to the correct queue. |
154 | EXPECT(raw_message == handler_peer.queue()->Dequeue().get()); |
155 | EXPECT(nullptr == handler_peer.oob_queue()->Dequeue()); |
156 | |
157 | // Post an oob message. |
158 | message = BlankMessage(1, Message::kOOBPriority); |
159 | raw_message = message.get(); |
160 | handler_peer.PostMessage(std::move(message)); |
161 | |
162 | // The notify callback is called. |
163 | EXPECT_EQ(2, handler.notify_count()); |
164 | |
165 | // The message has been added to the correct queue. |
166 | EXPECT(raw_message == handler_peer.oob_queue()->Dequeue().get()); |
167 | EXPECT(nullptr == handler_peer.queue()->Dequeue()); |
168 | } |
169 | |
170 | VM_UNIT_TEST_CASE(MessageHandler_HasOOBMessages) { |
171 | TestMessageHandler handler; |
172 | MessageHandlerTestPeer handler_peer(&handler); |
173 | |
174 | EXPECT(!handler.HasOOBMessages()); |
175 | |
176 | // Post a normal message. |
177 | std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority); |
178 | handler_peer.PostMessage(std::move(message)); |
179 | EXPECT(!handler.HasOOBMessages()); |
180 | { |
181 | // Acquire ownership of message handler queues, verify one regular message. |
182 | MessageHandler::AcquiredQueues aq(&handler); |
183 | EXPECT(aq.queue()->Length() == 1); |
184 | } |
185 | |
186 | // Post an oob message. |
187 | message = BlankMessage(1, Message::kOOBPriority); |
188 | handler_peer.PostMessage(std::move(message)); |
189 | EXPECT(handler.HasOOBMessages()); |
190 | { |
191 | // Acquire ownership of message handler queues, verify one regular and one |
192 | // OOB message. |
193 | MessageHandler::AcquiredQueues aq(&handler); |
194 | EXPECT(aq.queue()->Length() == 1); |
195 | EXPECT(aq.oob_queue()->Length() == 1); |
196 | } |
197 | |
198 | // Delete all pending messages. |
199 | handler_peer.CloseAllPorts(); |
200 | } |
201 | |
202 | VM_UNIT_TEST_CASE(MessageHandler_ClosePort) { |
203 | TestMessageHandler handler; |
204 | MessageHandlerTestPeer handler_peer(&handler); |
205 | std::unique_ptr<Message> message; |
206 | message = BlankMessage(1, Message::kNormalPriority); |
207 | Message* raw_message1 = message.get(); |
208 | handler_peer.PostMessage(std::move(message)); |
209 | message = BlankMessage(2, Message::kNormalPriority); |
210 | Message* raw_message2 = message.get(); |
211 | handler_peer.PostMessage(std::move(message)); |
212 | |
213 | handler_peer.ClosePort(1); |
214 | |
215 | // Closing the port does not drop the messages from the queue. |
216 | EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get()); |
217 | EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get()); |
218 | } |
219 | |
220 | VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) { |
221 | TestMessageHandler handler; |
222 | MessageHandlerTestPeer handler_peer(&handler); |
223 | handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority)); |
224 | handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority)); |
225 | |
226 | handler_peer.CloseAllPorts(); |
227 | |
228 | // All messages are dropped from the queue. |
229 | EXPECT(nullptr == handler_peer.queue()->Dequeue()); |
230 | } |
231 | |
232 | VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage) { |
233 | TestMessageHandler handler; |
234 | MessageHandlerTestPeer handler_peer(&handler); |
235 | Dart_Port port1 = PortMap::CreatePort(&handler); |
236 | Dart_Port port2 = PortMap::CreatePort(&handler); |
237 | Dart_Port port3 = PortMap::CreatePort(&handler); |
238 | handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority)); |
239 | handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority)); |
240 | handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority)); |
241 | handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority)); |
242 | |
243 | // We handle both oob messages and a single normal message. |
244 | EXPECT_EQ(MessageHandler::kOK, handler.HandleNextMessage()); |
245 | EXPECT_EQ(3, handler.message_count()); |
246 | Dart_Port* ports = handler.port_buffer(); |
247 | EXPECT_EQ(port2, ports[0]); |
248 | EXPECT_EQ(port3, ports[1]); |
249 | EXPECT_EQ(port1, ports[2]); |
250 | } |
251 | |
252 | VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_ProcessOOBAfterError) { |
253 | TestMessageHandler handler; |
254 | MessageHandler::MessageStatus results[] = { |
255 | MessageHandler::kError, // oob_message1 |
256 | MessageHandler::kOK, // oob_message2 |
257 | MessageHandler::kOK, // unused |
258 | }; |
259 | handler.set_results(results); |
260 | MessageHandlerTestPeer handler_peer(&handler); |
261 | Dart_Port port1 = PortMap::CreatePort(&handler); |
262 | Dart_Port port2 = PortMap::CreatePort(&handler); |
263 | Dart_Port port3 = PortMap::CreatePort(&handler); |
264 | handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority)); |
265 | handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority)); |
266 | handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority)); |
267 | |
268 | // When we get an error, we continue processing oob messages but |
269 | // stop handling normal messages. |
270 | EXPECT_EQ(MessageHandler::kError, handler.HandleNextMessage()); |
271 | EXPECT_EQ(2, handler.message_count()); |
272 | Dart_Port* ports = handler.port_buffer(); |
273 | EXPECT_EQ(port2, ports[0]); // oob_message1, error |
274 | EXPECT_EQ(port3, ports[1]); // oob_message2, ok |
275 | handler_peer.CloseAllPorts(); |
276 | } |
277 | |
278 | VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) { |
279 | TestMessageHandler handler; |
280 | MessageHandler::MessageStatus results[] = { |
281 | MessageHandler::kOK, // oob_message1 |
282 | MessageHandler::kShutdown, // oob_message2 |
283 | MessageHandler::kOK, // unused |
284 | MessageHandler::kOK, // unused |
285 | }; |
286 | handler.set_results(results); |
287 | MessageHandlerTestPeer handler_peer(&handler); |
288 | Dart_Port port1 = PortMap::CreatePort(&handler); |
289 | Dart_Port port2 = PortMap::CreatePort(&handler); |
290 | Dart_Port port3 = PortMap::CreatePort(&handler); |
291 | Dart_Port port4 = PortMap::CreatePort(&handler); |
292 | handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority)); |
293 | handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority)); |
294 | handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority)); |
295 | handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority)); |
296 | |
297 | // When we get a shutdown message, we stop processing all messages. |
298 | EXPECT_EQ(MessageHandler::kShutdown, handler.HandleNextMessage()); |
299 | EXPECT_EQ(2, handler.message_count()); |
300 | Dart_Port* ports = handler.port_buffer(); |
301 | EXPECT_EQ(port2, ports[0]); // oob_message1, ok |
302 | EXPECT_EQ(port3, ports[1]); // oob_message2, shutdown |
303 | { |
304 | // The oob queue has been cleared. oob_message3 is gone. |
305 | MessageHandler::AcquiredQueues aq(&handler); |
306 | EXPECT(aq.oob_queue()->Length() == 0); |
307 | } |
308 | handler_peer.CloseAllPorts(); |
309 | } |
310 | |
311 | VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) { |
312 | TestMessageHandler handler; |
313 | MessageHandlerTestPeer handler_peer(&handler); |
314 | Dart_Port port1 = PortMap::CreatePort(&handler); |
315 | Dart_Port port2 = PortMap::CreatePort(&handler); |
316 | Dart_Port port3 = PortMap::CreatePort(&handler); |
317 | Dart_Port port4 = PortMap::CreatePort(&handler); |
318 | handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority)); |
319 | handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority)); |
320 | handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority)); |
321 | handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority)); |
322 | |
323 | // We handle both oob messages but no normal messages. |
324 | EXPECT_EQ(MessageHandler::kOK, handler.HandleOOBMessages()); |
325 | EXPECT_EQ(2, handler.message_count()); |
326 | Dart_Port* ports = handler.port_buffer(); |
327 | EXPECT_EQ(port3, ports[0]); |
328 | EXPECT_EQ(port4, ports[1]); |
329 | handler_peer.CloseAllPorts(); |
330 | } |
331 | |
332 | struct ThreadStartInfo { |
333 | MessageHandler* handler; |
334 | Dart_Port* ports; |
335 | int count; |
336 | ThreadJoinId join_id; |
337 | }; |
338 | |
339 | static void SendMessages(uword param) { |
340 | ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param); |
341 | info->join_id = OSThread::GetCurrentThreadJoinId(OSThread::Current()); |
342 | MessageHandler* handler = info->handler; |
343 | MessageHandlerTestPeer handler_peer(handler); |
344 | for (int i = 0; i < info->count; i++) { |
345 | handler_peer.PostMessage( |
346 | BlankMessage(info->ports[i], Message::kNormalPriority)); |
347 | } |
348 | } |
349 | |
350 | VM_UNIT_TEST_CASE(MessageHandler_Run) { |
351 | TestMessageHandler handler; |
352 | ThreadPool pool; |
353 | MessageHandlerTestPeer handler_peer(&handler); |
354 | |
355 | EXPECT(!handler.HasLivePorts()); |
356 | handler_peer.increment_live_ports(); |
357 | |
358 | handler.Run(&pool, TestStartFunction, TestEndFunction, |
359 | reinterpret_cast<uword>(&handler)); |
360 | Dart_Port port = PortMap::CreatePort(&handler); |
361 | handler_peer.PostMessage(BlankMessage(port, Message::kNormalPriority)); |
362 | |
363 | // Wait for the first message to be handled. |
364 | { |
365 | MonitorLocker ml(handler.monitor()); |
366 | while (handler.message_count() < 1) { |
367 | ml.Wait(); |
368 | } |
369 | EXPECT_EQ(1, handler.message_count()); |
370 | EXPECT(handler.start_called()); |
371 | EXPECT(!handler.end_called()); |
372 | Dart_Port* handler_ports = handler.port_buffer(); |
373 | EXPECT_EQ(port, handler_ports[0]); |
374 | } |
375 | |
376 | // Start a thread which sends more messages. |
377 | Dart_Port ports[10]; |
378 | for (int i = 0; i < 10; i++) { |
379 | ports[i] = PortMap::CreatePort(&handler); |
380 | } |
381 | ThreadStartInfo info; |
382 | info.handler = &handler; |
383 | info.ports = ports; |
384 | info.count = 10; |
385 | info.join_id = OSThread::kInvalidThreadJoinId; |
386 | OSThread::Start("SendMessages" , SendMessages, reinterpret_cast<uword>(&info)); |
387 | |
388 | // Wait for the messages to be handled. |
389 | { |
390 | MonitorLocker ml(handler.monitor()); |
391 | while (handler.message_count() < 11) { |
392 | ml.Wait(); |
393 | } |
394 | Dart_Port* handler_ports = handler.port_buffer(); |
395 | EXPECT_EQ(11, handler.message_count()); |
396 | EXPECT(handler.start_called()); |
397 | EXPECT(!handler.end_called()); |
398 | EXPECT_EQ(port, handler_ports[0]); |
399 | for (int i = 1; i < 11; i++) { |
400 | EXPECT_EQ(ports[i - 1], handler_ports[i]); |
401 | } |
402 | handler_peer.decrement_live_ports(); |
403 | EXPECT(!handler.HasLivePorts()); |
404 | } |
405 | |
406 | // Must join the thread or the VM shutdown is racing with any VM state the |
407 | // thread touched. |
408 | ASSERT(info.join_id != OSThread::kInvalidThreadJoinId); |
409 | OSThread::Join(info.join_id); |
410 | } |
411 | |
412 | } // namespace dart |
413 | |