1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/io/async/NotificationQueue.h> |
18 | |
19 | #include <sys/types.h> |
20 | |
21 | #include <iostream> |
22 | #include <list> |
23 | #include <thread> |
24 | |
25 | #include <folly/io/async/ScopedEventBaseThread.h> |
26 | #include <folly/portability/GTest.h> |
27 | #include <folly/synchronization/Baton.h> |
28 | |
29 | #ifndef _WIN32 |
30 | #include <sys/wait.h> |
31 | #endif |
32 | |
33 | using namespace std; |
34 | using namespace folly; |
35 | |
36 | typedef NotificationQueue<int> IntQueue; |
37 | |
38 | class QueueConsumer : public IntQueue::Consumer { |
39 | public: |
40 | QueueConsumer() {} |
41 | |
42 | void messageAvailable(int&& value) noexcept override { |
43 | messages.push_back(value); |
44 | if (fn) { |
45 | fn(value); |
46 | } |
47 | } |
48 | |
49 | std::function<void(int)> fn; |
50 | std::deque<int> messages; |
51 | }; |
52 | |
53 | class QueueTest { |
54 | public: |
55 | explicit QueueTest(uint32_t maxSize, IntQueue::FdType type) |
56 | : queue(maxSize, type), terminationQueue(maxSize, type) {} |
57 | |
58 | void sendOne(); |
59 | void putMessages(); |
60 | void multiConsumer(); |
61 | void maxQueueSize(); |
62 | void maxReadAtOnce(); |
63 | void destroyCallback(); |
64 | void useAfterFork(); |
65 | |
66 | IntQueue queue; |
67 | IntQueue terminationQueue; |
68 | }; |
69 | |
70 | void QueueTest::sendOne() { |
71 | // Create a notification queue and a callback in this thread |
72 | EventBase eventBase; |
73 | |
74 | QueueConsumer consumer; |
75 | consumer.fn = [&](int) { |
76 | // Stop consuming after we receive 1 message |
77 | consumer.stopConsuming(); |
78 | }; |
79 | consumer.startConsuming(&eventBase, &queue); |
80 | |
81 | // Start a new EventBase thread to put a message on our queue |
82 | ScopedEventBaseThread t1; |
83 | t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); }); |
84 | |
85 | // Loop until we receive the message |
86 | eventBase.loop(); |
87 | |
88 | const auto& messages = consumer.messages; |
89 | EXPECT_EQ(1, messages.size()); |
90 | EXPECT_EQ(5, messages.at(0)); |
91 | } |
92 | |
93 | void QueueTest::putMessages() { |
94 | EventBase eventBase; |
95 | |
96 | QueueConsumer consumer; |
97 | QueueConsumer consumer2; |
98 | consumer.fn = [&](int msg) { |
99 | // Stop consuming after we receive a message with value 0, and start |
100 | // consumer2 |
101 | if (msg == 0) { |
102 | consumer.stopConsuming(); |
103 | consumer2.startConsuming(&eventBase, &queue); |
104 | } |
105 | }; |
106 | consumer2.fn = [&](int msg) { |
107 | // Stop consuming after we receive a message with value 0 |
108 | if (msg == 0) { |
109 | consumer2.stopConsuming(); |
110 | } |
111 | }; |
112 | consumer.startConsuming(&eventBase, &queue); |
113 | |
114 | list<int> msgList = {1, 2, 3, 4}; |
115 | vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0}; |
116 | // Call putMessages() several times to add messages to the queue |
117 | queue.putMessages(msgList.begin(), msgList.end()); |
118 | queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4); |
119 | // Test sending 17 messages, the pipe-based queue calls write in 16 byte |
120 | // chunks |
121 | queue.putMessages(msgVector.begin(), msgVector.end()); |
122 | |
123 | // Loop until the consumer has stopped |
124 | eventBase.loop(); |
125 | |
126 | vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0}; |
127 | vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0}; |
128 | EXPECT_EQ(expectedMessages.size(), consumer.messages.size()); |
129 | for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) { |
130 | EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx)); |
131 | } |
132 | EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size()); |
133 | for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) { |
134 | EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx)); |
135 | } |
136 | } |
137 | |
138 | void QueueTest::multiConsumer() { |
139 | uint32_t numConsumers = 8; |
140 | uint32_t numMessages = 10000; |
141 | |
142 | // Create several consumers each running in their own EventBase thread |
143 | vector<QueueConsumer> consumers(numConsumers); |
144 | vector<ScopedEventBaseThread> threads(numConsumers); |
145 | |
146 | for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) { |
147 | QueueConsumer* consumer = &consumers[consumerIdx]; |
148 | |
149 | consumer->fn = [consumer, consumerIdx, this](int value) { |
150 | // Treat 0 as a signal to stop. |
151 | if (value == 0) { |
152 | consumer->stopConsuming(); |
153 | // Put a message on the terminationQueue to indicate we have stopped |
154 | terminationQueue.putMessage(consumerIdx); |
155 | } |
156 | }; |
157 | |
158 | EventBase* eventBase = threads[consumerIdx].getEventBase(); |
159 | eventBase->runInEventBaseThread([eventBase, consumer, this] { |
160 | consumer->startConsuming(eventBase, &queue); |
161 | }); |
162 | } |
163 | |
164 | // Now add a number of messages from this thread |
165 | // Start at 1 rather than 0, since 0 is the signal to stop. |
166 | for (uint32_t n = 1; n < numMessages; ++n) { |
167 | queue.putMessage(n); |
168 | } |
169 | // Now add a 0 for each consumer, to signal them to stop |
170 | for (uint32_t n = 0; n < numConsumers; ++n) { |
171 | queue.putMessage(0); |
172 | } |
173 | |
174 | // Wait until we get notified that all of the consumers have stopped |
175 | // We use a separate notification queue for this. |
176 | QueueConsumer terminationConsumer; |
177 | vector<uint32_t> (numConsumers, 0); |
178 | uint32_t consumersRemaining = numConsumers; |
179 | terminationConsumer.fn = [&](int consumerIdx) { |
180 | --consumersRemaining; |
181 | if (consumersRemaining == 0) { |
182 | terminationConsumer.stopConsuming(); |
183 | } |
184 | |
185 | EXPECT_GE(consumerIdx, 0); |
186 | EXPECT_LT(consumerIdx, numConsumers); |
187 | ++consumersStopped[consumerIdx]; |
188 | }; |
189 | EventBase eventBase; |
190 | terminationConsumer.startConsuming(&eventBase, &terminationQueue); |
191 | eventBase.loop(); |
192 | |
193 | // Verify that we saw exactly 1 stop message for each consumer |
194 | for (uint32_t n = 0; n < numConsumers; ++n) { |
195 | EXPECT_EQ(1, consumersStopped[n]); |
196 | } |
197 | |
198 | // Validate that every message sent to the main queue was received exactly |
199 | // once. |
200 | vector<int> messageCount(numMessages, 0); |
201 | for (uint32_t n = 0; n < numConsumers; ++n) { |
202 | for (int msg : consumers[n].messages) { |
203 | EXPECT_GE(msg, 0); |
204 | EXPECT_LT(msg, numMessages); |
205 | ++messageCount[msg]; |
206 | } |
207 | } |
208 | |
209 | // 0 is the signal to stop, and should have been received once by each |
210 | // consumer |
211 | EXPECT_EQ(numConsumers, messageCount[0]); |
212 | // All other messages should have been received exactly once |
213 | for (uint32_t n = 1; n < numMessages; ++n) { |
214 | EXPECT_EQ(1, messageCount[n]); |
215 | } |
216 | } |
217 | |
218 | void QueueTest::maxQueueSize() { |
219 | // Create a queue with a maximum size of 5, and fill it up |
220 | |
221 | for (int n = 0; n < 5; ++n) { |
222 | queue.tryPutMessage(n); |
223 | } |
224 | |
225 | // Calling tryPutMessage() now should fail |
226 | EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error); |
227 | |
228 | EXPECT_FALSE(queue.tryPutMessageNoThrow(5)); |
229 | int val = 5; |
230 | EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val))); |
231 | |
232 | // Pop a message from the queue |
233 | int result = -1; |
234 | EXPECT_TRUE(queue.tryConsume(result)); |
235 | EXPECT_EQ(0, result); |
236 | |
237 | // We should be able to write another message now that we popped one off. |
238 | queue.tryPutMessage(5); |
239 | // But now we are full again. |
240 | EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error); |
241 | // putMessage() should let us exceed the maximum |
242 | queue.putMessage(6); |
243 | |
244 | // Pull another mesage off |
245 | EXPECT_TRUE(queue.tryConsume(result)); |
246 | EXPECT_EQ(1, result); |
247 | |
248 | // tryPutMessage() should still fail since putMessage() actually put us over |
249 | // the max. |
250 | EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error); |
251 | |
252 | // Pull another message off and try again |
253 | EXPECT_TRUE(queue.tryConsume(result)); |
254 | EXPECT_EQ(2, result); |
255 | queue.tryPutMessage(7); |
256 | |
257 | // Now pull all the remaining messages off |
258 | EXPECT_TRUE(queue.tryConsume(result)); |
259 | EXPECT_EQ(3, result); |
260 | EXPECT_TRUE(queue.tryConsume(result)); |
261 | EXPECT_EQ(4, result); |
262 | EXPECT_TRUE(queue.tryConsume(result)); |
263 | EXPECT_EQ(5, result); |
264 | EXPECT_TRUE(queue.tryConsume(result)); |
265 | EXPECT_EQ(6, result); |
266 | EXPECT_TRUE(queue.tryConsume(result)); |
267 | EXPECT_EQ(7, result); |
268 | |
269 | // There should be no messages left |
270 | result = -1; |
271 | EXPECT_TRUE(!queue.tryConsume(result)); |
272 | EXPECT_EQ(-1, result); |
273 | } |
274 | |
275 | void QueueTest::maxReadAtOnce() { |
276 | // Add 100 messages to the queue |
277 | for (int n = 0; n < 100; ++n) { |
278 | queue.putMessage(n); |
279 | } |
280 | |
281 | EventBase eventBase; |
282 | |
283 | // Record how many messages were processed each loop iteration. |
284 | uint32_t messagesThisLoop = 0; |
285 | std::vector<uint32_t> messagesPerLoop; |
286 | std::function<void()> loopFinished = [&] { |
287 | // Record the current number of messages read this loop |
288 | messagesPerLoop.push_back(messagesThisLoop); |
289 | // Reset messagesThisLoop to 0 for the next loop |
290 | messagesThisLoop = 0; |
291 | |
292 | // To prevent use-after-free bugs when eventBase destructs, |
293 | // prevent calling runInLoop any more after the test is finished. |
294 | // 55 == number of times loop should run. |
295 | if (messagesPerLoop.size() != 55) { |
296 | // Reschedule ourself to run at the end of the next loop |
297 | eventBase.runInLoop(loopFinished); |
298 | } |
299 | }; |
300 | // Schedule the first call to loopFinished |
301 | eventBase.runInLoop(loopFinished); |
302 | |
303 | QueueConsumer consumer; |
304 | // Read the first 50 messages 10 at a time. |
305 | consumer.setMaxReadAtOnce(10); |
306 | consumer.fn = [&](int value) { |
307 | ++messagesThisLoop; |
308 | // After 50 messages, drop to reading only 1 message at a time. |
309 | if (value == 50) { |
310 | consumer.setMaxReadAtOnce(1); |
311 | } |
312 | // Terminate the loop when we reach the end of the messages. |
313 | if (value == 99) { |
314 | eventBase.terminateLoopSoon(); |
315 | } |
316 | }; |
317 | consumer.startConsuming(&eventBase, &queue); |
318 | |
319 | // Run the event loop until the consumer terminates it |
320 | eventBase.loop(); |
321 | |
322 | // The consumer should have read all 100 messages in order |
323 | EXPECT_EQ(100, consumer.messages.size()); |
324 | for (int n = 0; n < 100; ++n) { |
325 | EXPECT_EQ(n, consumer.messages.at(n)); |
326 | } |
327 | |
328 | // Currently EventBase happens to still run the loop callbacks even after |
329 | // terminateLoopSoon() is called. However, we don't really want to depend on |
330 | // this behavior. In case this ever changes in the future, add |
331 | // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the |
332 | // last loop iteration. |
333 | if (messagesThisLoop > 0) { |
334 | messagesPerLoop.push_back(messagesThisLoop); |
335 | messagesThisLoop = 0; |
336 | } |
337 | |
338 | // For the first 5 loops it should have read 10 messages each time. |
339 | // After that it should have read 1 messages per loop for the next 50 loops. |
340 | EXPECT_EQ(55, messagesPerLoop.size()); |
341 | for (int n = 0; n < 5; ++n) { |
342 | EXPECT_EQ(10, messagesPerLoop.at(n)); |
343 | } |
344 | for (int n = 5; n < 55; ++n) { |
345 | EXPECT_EQ(1, messagesPerLoop.at(n)); |
346 | } |
347 | } |
348 | |
349 | void QueueTest::destroyCallback() { |
350 | // Rather than using QueueConsumer, define a separate class for the destroy |
351 | // test. The DestroyTestConsumer will delete itself inside the |
352 | // messageAvailable() callback. With a regular QueueConsumer this would |
353 | // destroy the std::function object while the function is running, which we |
354 | // should probably avoid doing. This uses a pointer to a std::function to |
355 | // avoid destroying the function object. |
356 | class DestroyTestConsumer : public IntQueue::Consumer { |
357 | public: |
358 | void messageAvailable(int&& value) noexcept override { |
359 | DestructorGuard g(this); |
360 | if (fn && *fn) { |
361 | (*fn)(value); |
362 | } |
363 | } |
364 | |
365 | std::function<void(int)>* fn; |
366 | |
367 | protected: |
368 | ~DestroyTestConsumer() override = default; |
369 | }; |
370 | |
371 | EventBase eventBase; |
372 | // Create a queue and add 2 messages to it |
373 | queue.putMessage(1); |
374 | queue.putMessage(2); |
375 | |
376 | // Create two QueueConsumers allocated on the heap. |
377 | // Have whichever one gets called first destroy both of the QueueConsumers. |
378 | // This way one consumer will be destroyed from inside its messageAvailable() |
379 | // callback, and one consume will be destroyed when it isn't inside |
380 | // messageAvailable(). |
381 | std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor> |
382 | consumer1(new DestroyTestConsumer); |
383 | std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor> |
384 | consumer2(new DestroyTestConsumer); |
385 | std::function<void(int)> fn = [&](int) { |
386 | consumer1 = nullptr; |
387 | consumer2 = nullptr; |
388 | }; |
389 | consumer1->fn = &fn; |
390 | consumer2->fn = &fn; |
391 | |
392 | consumer1->startConsuming(&eventBase, &queue); |
393 | consumer2->startConsuming(&eventBase, &queue); |
394 | |
395 | // Run the event loop. |
396 | eventBase.loop(); |
397 | |
398 | // One of the consumers should have fired, received the message, |
399 | // then destroyed both consumers. |
400 | EXPECT_TRUE(!consumer1); |
401 | EXPECT_TRUE(!consumer2); |
402 | // One message should be left in the queue |
403 | int result = 1; |
404 | EXPECT_TRUE(queue.tryConsume(result)); |
405 | EXPECT_EQ(2, result); |
406 | } |
407 | |
408 | TEST(NotificationQueueTest, ConsumeUntilDrained) { |
409 | // Basic tests: make sure we |
410 | // - drain all the messages |
411 | // - ignore any maxReadAtOnce |
412 | // - can't add messages during draining |
413 | EventBase eventBase; |
414 | IntQueue queue; |
415 | QueueConsumer consumer; |
416 | consumer.fn = [&](int i) { |
417 | EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error); |
418 | EXPECT_FALSE(queue.tryPutMessageNoThrow(i)); |
419 | EXPECT_THROW(queue.putMessage(i), std::runtime_error); |
420 | std::vector<int> ints{1, 2, 3}; |
421 | EXPECT_THROW( |
422 | queue.putMessages(ints.begin(), ints.end()), std::runtime_error); |
423 | }; |
424 | consumer.setMaxReadAtOnce(10); // We should ignore this |
425 | consumer.startConsuming(&eventBase, &queue); |
426 | for (int i = 0; i < 20; i++) { |
427 | queue.putMessage(i); |
428 | } |
429 | EXPECT_TRUE(consumer.consumeUntilDrained()); |
430 | EXPECT_EQ(20, consumer.messages.size()); |
431 | |
432 | // Make sure there can only be one drainer at once |
433 | folly::Baton<> callbackBaton, threadStartBaton; |
434 | consumer.fn = [&](int /* i */) { callbackBaton.wait(); }; |
435 | QueueConsumer competingConsumer; |
436 | competingConsumer.startConsuming(&eventBase, &queue); |
437 | queue.putMessage(1); |
438 | atomic<bool> raceA{false}; |
439 | atomic<bool> raceB{false}; |
440 | size_t numConsA = 0; |
441 | size_t numConsB = 0; |
442 | auto thread = std::thread([&] { |
443 | threadStartBaton.post(); |
444 | raceB = consumer.consumeUntilDrained(&numConsB) && numConsB; |
445 | }); |
446 | threadStartBaton.wait(); |
447 | raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA; |
448 | callbackBaton.post(); |
449 | thread.join(); |
450 | EXPECT_FALSE(raceA && raceB); |
451 | EXPECT_TRUE(raceA || raceB); |
452 | EXPECT_TRUE(raceA ^ raceB); |
453 | } |
454 | |
455 | TEST(NotificationQueueTest, ConsumeUntilDrainedStress) { |
456 | for (size_t i = 0; i < 1 << 8; ++i) { |
457 | // Basic tests: make sure we |
458 | // - drain all the messages |
459 | // - ignore any maxReadAtOnce |
460 | // - can't add messages during draining |
461 | EventBase eventBase; |
462 | IntQueue queue; |
463 | QueueConsumer consumer; |
464 | consumer.fn = [&](int j) { |
465 | EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error); |
466 | EXPECT_FALSE(queue.tryPutMessageNoThrow(j)); |
467 | EXPECT_THROW(queue.putMessage(j), std::runtime_error); |
468 | std::vector<int> ints{1, 2, 3}; |
469 | EXPECT_THROW( |
470 | queue.putMessages(ints.begin(), ints.end()), std::runtime_error); |
471 | }; |
472 | consumer.setMaxReadAtOnce(10); // We should ignore this |
473 | consumer.startConsuming(&eventBase, &queue); |
474 | for (int j = 0; j < 20; j++) { |
475 | queue.putMessage(j); |
476 | } |
477 | EXPECT_TRUE(consumer.consumeUntilDrained()); |
478 | EXPECT_EQ(20, consumer.messages.size()); |
479 | |
480 | // Make sure there can only be one drainer at once |
481 | folly::Baton<> callbackBaton, threadStartBaton; |
482 | consumer.fn = [&](int /* i */) { callbackBaton.wait(); }; |
483 | QueueConsumer competingConsumer; |
484 | competingConsumer.startConsuming(&eventBase, &queue); |
485 | queue.putMessage(1); |
486 | atomic<bool> raceA{false}; |
487 | atomic<bool> raceB{false}; |
488 | size_t numConsA = 0; |
489 | size_t numConsB = 0; |
490 | auto thread = std::thread([&] { |
491 | threadStartBaton.post(); |
492 | raceB = consumer.consumeUntilDrained(&numConsB) && numConsB; |
493 | }); |
494 | threadStartBaton.wait(); |
495 | raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA; |
496 | callbackBaton.post(); |
497 | thread.join(); |
498 | EXPECT_FALSE(raceA && raceB); |
499 | EXPECT_TRUE(raceA || raceB); |
500 | EXPECT_TRUE(raceA ^ raceB); |
501 | } |
502 | } |
503 | |
504 | #ifdef FOLLY_HAVE_EVENTFD |
505 | TEST(NotificationQueueTest, SendOneEventFD) { |
506 | QueueTest qt(0, IntQueue::FdType::EVENTFD); |
507 | qt.sendOne(); |
508 | } |
509 | |
510 | TEST(NotificationQueueTest, PutMessagesEventFD) { |
511 | QueueTest qt(0, IntQueue::FdType::EVENTFD); |
512 | qt.sendOne(); |
513 | } |
514 | |
515 | TEST(NotificationQueueTest, MultiConsumerEventFD) { |
516 | QueueTest qt(0, IntQueue::FdType::EVENTFD); |
517 | qt.multiConsumer(); |
518 | } |
519 | |
520 | TEST(NotificationQueueTest, MaxQueueSizeEventFD) { |
521 | QueueTest qt(5, IntQueue::FdType::EVENTFD); |
522 | qt.maxQueueSize(); |
523 | } |
524 | |
525 | TEST(NotificationQueueTest, MaxReadAtOnceEventFD) { |
526 | QueueTest qt(0, IntQueue::FdType::EVENTFD); |
527 | qt.maxReadAtOnce(); |
528 | } |
529 | |
530 | TEST(NotificationQueueTest, DestroyCallbackEventFD) { |
531 | QueueTest qt(0, IntQueue::FdType::EVENTFD); |
532 | qt.destroyCallback(); |
533 | } |
534 | #endif |
535 | |
536 | TEST(NotificationQueueTest, SendOnePipe) { |
537 | QueueTest qt(0, IntQueue::FdType::PIPE); |
538 | qt.sendOne(); |
539 | } |
540 | |
541 | TEST(NotificationQueueTest, PutMessagesPipe) { |
542 | QueueTest qt(0, IntQueue::FdType::PIPE); |
543 | qt.sendOne(); |
544 | } |
545 | |
546 | TEST(NotificationQueueTest, MultiConsumerPipe) { |
547 | QueueTest qt(0, IntQueue::FdType::PIPE); |
548 | qt.multiConsumer(); |
549 | } |
550 | |
551 | TEST(NotificationQueueTest, MaxQueueSizePipe) { |
552 | QueueTest qt(5, IntQueue::FdType::PIPE); |
553 | qt.maxQueueSize(); |
554 | } |
555 | |
556 | TEST(NotificationQueueTest, MaxReadAtOncePipe) { |
557 | QueueTest qt(0, IntQueue::FdType::PIPE); |
558 | qt.maxReadAtOnce(); |
559 | } |
560 | |
561 | TEST(NotificationQueueTest, DestroyCallbackPipe) { |
562 | QueueTest qt(0, IntQueue::FdType::PIPE); |
563 | qt.destroyCallback(); |
564 | } |
565 | |
566 | #ifndef _WIN32 |
567 | /* |
568 | * Test code that creates a NotificationQueue, then forks, and incorrectly |
569 | * tries to send a message to the queue from the child process. |
570 | * |
571 | * The child process should crash in this scenario, since the child code has a |
572 | * bug. (Older versions of NotificationQueue didn't catch this in the child, |
573 | * resulting in a crash in the parent process.) |
574 | */ |
575 | TEST(NotificationQueueTest, UseAfterFork) { |
576 | IntQueue queue; |
577 | int childStatus = 0; |
578 | QueueConsumer consumer; |
579 | |
580 | // Boost sets a custom SIGCHLD handler, which fails the test if a child |
581 | // process exits abnormally. We don't want this. |
582 | signal(SIGCHLD, SIG_DFL); |
583 | |
584 | // Log some info so users reading the test output aren't confused |
585 | // by the child process' crash log messages. |
586 | LOG(INFO) << "This test makes sure the child process crashes. " |
587 | << "Error log messagges and a backtrace are expected." ; |
588 | |
589 | { |
590 | // Start a separate thread consuming from the queue |
591 | ScopedEventBaseThread t1; |
592 | t1.getEventBase()->runInEventBaseThread( |
593 | [&] { consumer.startConsuming(t1.getEventBase(), &queue); }); |
594 | |
595 | // Send a message to it, just for sanity checking |
596 | queue.putMessage(1234); |
597 | |
598 | // Fork |
599 | pid_t pid = fork(); |
600 | if (pid == 0) { |
601 | // The boost test framework installs signal handlers to catch errors. |
602 | // We only want to catch in the parent. In the child let SIGABRT crash |
603 | // us normally. |
604 | signal(SIGABRT, SIG_DFL); |
605 | |
606 | // Child. |
607 | // We're horrible people, so we try to send a message to the queue |
608 | // that is being consumed in the parent process. |
609 | // |
610 | // The putMessage() call should catch this error, and crash our process. |
611 | queue.putMessage(9876); |
612 | // We shouldn't reach here. |
613 | _exit(0); |
614 | } |
615 | PCHECK(pid > 0); |
616 | |
617 | // Parent. Wait for the child to exit. |
618 | auto waited = waitpid(pid, &childStatus, 0); |
619 | EXPECT_EQ(pid, waited); |
620 | |
621 | // Send another message to the queue before we terminate the thread. |
622 | queue.putMessage(5678); |
623 | } |
624 | |
625 | // The child process should have crashed when it tried to call putMessage() |
626 | // on our NotificationQueue. |
627 | EXPECT_TRUE(WIFSIGNALED(childStatus)); |
628 | EXPECT_EQ(SIGABRT, WTERMSIG(childStatus)); |
629 | |
630 | // Make sure the parent saw the expected messages. |
631 | // It should have gotten 1234 and 5678 from the parent process, but not |
632 | // 9876 from the child. |
633 | EXPECT_EQ(2, consumer.messages.size()); |
634 | EXPECT_EQ(1234, consumer.messages.front()); |
635 | consumer.messages.pop_front(); |
636 | EXPECT_EQ(5678, consumer.messages.front()); |
637 | consumer.messages.pop_front(); |
638 | } |
639 | #endif |
640 | |
641 | TEST(NotificationQueueConsumer, make) { |
642 | int value = 0; |
643 | EventBase evb; |
644 | NotificationQueue<int> queue(32); |
645 | |
646 | auto consumer = |
647 | decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; }); |
648 | |
649 | consumer->startConsuming(&evb, &queue); |
650 | |
651 | int const newValue = 10; |
652 | queue.tryPutMessage(newValue); |
653 | |
654 | evb.loopOnce(); |
655 | |
656 | EXPECT_EQ(newValue, value); |
657 | } |
658 | |