1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/io/async/NotificationQueue.h>
18
19#include <sys/types.h>
20
21#include <iostream>
22#include <list>
23#include <thread>
24
25#include <folly/io/async/ScopedEventBaseThread.h>
26#include <folly/portability/GTest.h>
27#include <folly/synchronization/Baton.h>
28
29#ifndef _WIN32
30#include <sys/wait.h>
31#endif
32
33using namespace std;
34using namespace folly;
35
36typedef NotificationQueue<int> IntQueue;
37
38class QueueConsumer : public IntQueue::Consumer {
39 public:
40 QueueConsumer() {}
41
42 void messageAvailable(int&& value) noexcept override {
43 messages.push_back(value);
44 if (fn) {
45 fn(value);
46 }
47 }
48
49 std::function<void(int)> fn;
50 std::deque<int> messages;
51};
52
53class QueueTest {
54 public:
55 explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
56 : queue(maxSize, type), terminationQueue(maxSize, type) {}
57
58 void sendOne();
59 void putMessages();
60 void multiConsumer();
61 void maxQueueSize();
62 void maxReadAtOnce();
63 void destroyCallback();
64 void useAfterFork();
65
66 IntQueue queue;
67 IntQueue terminationQueue;
68};
69
70void QueueTest::sendOne() {
71 // Create a notification queue and a callback in this thread
72 EventBase eventBase;
73
74 QueueConsumer consumer;
75 consumer.fn = [&](int) {
76 // Stop consuming after we receive 1 message
77 consumer.stopConsuming();
78 };
79 consumer.startConsuming(&eventBase, &queue);
80
81 // Start a new EventBase thread to put a message on our queue
82 ScopedEventBaseThread t1;
83 t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); });
84
85 // Loop until we receive the message
86 eventBase.loop();
87
88 const auto& messages = consumer.messages;
89 EXPECT_EQ(1, messages.size());
90 EXPECT_EQ(5, messages.at(0));
91}
92
93void QueueTest::putMessages() {
94 EventBase eventBase;
95
96 QueueConsumer consumer;
97 QueueConsumer consumer2;
98 consumer.fn = [&](int msg) {
99 // Stop consuming after we receive a message with value 0, and start
100 // consumer2
101 if (msg == 0) {
102 consumer.stopConsuming();
103 consumer2.startConsuming(&eventBase, &queue);
104 }
105 };
106 consumer2.fn = [&](int msg) {
107 // Stop consuming after we receive a message with value 0
108 if (msg == 0) {
109 consumer2.stopConsuming();
110 }
111 };
112 consumer.startConsuming(&eventBase, &queue);
113
114 list<int> msgList = {1, 2, 3, 4};
115 vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
116 // Call putMessages() several times to add messages to the queue
117 queue.putMessages(msgList.begin(), msgList.end());
118 queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
119 // Test sending 17 messages, the pipe-based queue calls write in 16 byte
120 // chunks
121 queue.putMessages(msgVector.begin(), msgVector.end());
122
123 // Loop until the consumer has stopped
124 eventBase.loop();
125
126 vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
127 vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
128 EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
129 for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
130 EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
131 }
132 EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
133 for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
134 EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
135 }
136}
137
138void QueueTest::multiConsumer() {
139 uint32_t numConsumers = 8;
140 uint32_t numMessages = 10000;
141
142 // Create several consumers each running in their own EventBase thread
143 vector<QueueConsumer> consumers(numConsumers);
144 vector<ScopedEventBaseThread> threads(numConsumers);
145
146 for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
147 QueueConsumer* consumer = &consumers[consumerIdx];
148
149 consumer->fn = [consumer, consumerIdx, this](int value) {
150 // Treat 0 as a signal to stop.
151 if (value == 0) {
152 consumer->stopConsuming();
153 // Put a message on the terminationQueue to indicate we have stopped
154 terminationQueue.putMessage(consumerIdx);
155 }
156 };
157
158 EventBase* eventBase = threads[consumerIdx].getEventBase();
159 eventBase->runInEventBaseThread([eventBase, consumer, this] {
160 consumer->startConsuming(eventBase, &queue);
161 });
162 }
163
164 // Now add a number of messages from this thread
165 // Start at 1 rather than 0, since 0 is the signal to stop.
166 for (uint32_t n = 1; n < numMessages; ++n) {
167 queue.putMessage(n);
168 }
169 // Now add a 0 for each consumer, to signal them to stop
170 for (uint32_t n = 0; n < numConsumers; ++n) {
171 queue.putMessage(0);
172 }
173
174 // Wait until we get notified that all of the consumers have stopped
175 // We use a separate notification queue for this.
176 QueueConsumer terminationConsumer;
177 vector<uint32_t> consumersStopped(numConsumers, 0);
178 uint32_t consumersRemaining = numConsumers;
179 terminationConsumer.fn = [&](int consumerIdx) {
180 --consumersRemaining;
181 if (consumersRemaining == 0) {
182 terminationConsumer.stopConsuming();
183 }
184
185 EXPECT_GE(consumerIdx, 0);
186 EXPECT_LT(consumerIdx, numConsumers);
187 ++consumersStopped[consumerIdx];
188 };
189 EventBase eventBase;
190 terminationConsumer.startConsuming(&eventBase, &terminationQueue);
191 eventBase.loop();
192
193 // Verify that we saw exactly 1 stop message for each consumer
194 for (uint32_t n = 0; n < numConsumers; ++n) {
195 EXPECT_EQ(1, consumersStopped[n]);
196 }
197
198 // Validate that every message sent to the main queue was received exactly
199 // once.
200 vector<int> messageCount(numMessages, 0);
201 for (uint32_t n = 0; n < numConsumers; ++n) {
202 for (int msg : consumers[n].messages) {
203 EXPECT_GE(msg, 0);
204 EXPECT_LT(msg, numMessages);
205 ++messageCount[msg];
206 }
207 }
208
209 // 0 is the signal to stop, and should have been received once by each
210 // consumer
211 EXPECT_EQ(numConsumers, messageCount[0]);
212 // All other messages should have been received exactly once
213 for (uint32_t n = 1; n < numMessages; ++n) {
214 EXPECT_EQ(1, messageCount[n]);
215 }
216}
217
218void QueueTest::maxQueueSize() {
219 // Create a queue with a maximum size of 5, and fill it up
220
221 for (int n = 0; n < 5; ++n) {
222 queue.tryPutMessage(n);
223 }
224
225 // Calling tryPutMessage() now should fail
226 EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
227
228 EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
229 int val = 5;
230 EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
231
232 // Pop a message from the queue
233 int result = -1;
234 EXPECT_TRUE(queue.tryConsume(result));
235 EXPECT_EQ(0, result);
236
237 // We should be able to write another message now that we popped one off.
238 queue.tryPutMessage(5);
239 // But now we are full again.
240 EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
241 // putMessage() should let us exceed the maximum
242 queue.putMessage(6);
243
244 // Pull another mesage off
245 EXPECT_TRUE(queue.tryConsume(result));
246 EXPECT_EQ(1, result);
247
248 // tryPutMessage() should still fail since putMessage() actually put us over
249 // the max.
250 EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
251
252 // Pull another message off and try again
253 EXPECT_TRUE(queue.tryConsume(result));
254 EXPECT_EQ(2, result);
255 queue.tryPutMessage(7);
256
257 // Now pull all the remaining messages off
258 EXPECT_TRUE(queue.tryConsume(result));
259 EXPECT_EQ(3, result);
260 EXPECT_TRUE(queue.tryConsume(result));
261 EXPECT_EQ(4, result);
262 EXPECT_TRUE(queue.tryConsume(result));
263 EXPECT_EQ(5, result);
264 EXPECT_TRUE(queue.tryConsume(result));
265 EXPECT_EQ(6, result);
266 EXPECT_TRUE(queue.tryConsume(result));
267 EXPECT_EQ(7, result);
268
269 // There should be no messages left
270 result = -1;
271 EXPECT_TRUE(!queue.tryConsume(result));
272 EXPECT_EQ(-1, result);
273}
274
275void QueueTest::maxReadAtOnce() {
276 // Add 100 messages to the queue
277 for (int n = 0; n < 100; ++n) {
278 queue.putMessage(n);
279 }
280
281 EventBase eventBase;
282
283 // Record how many messages were processed each loop iteration.
284 uint32_t messagesThisLoop = 0;
285 std::vector<uint32_t> messagesPerLoop;
286 std::function<void()> loopFinished = [&] {
287 // Record the current number of messages read this loop
288 messagesPerLoop.push_back(messagesThisLoop);
289 // Reset messagesThisLoop to 0 for the next loop
290 messagesThisLoop = 0;
291
292 // To prevent use-after-free bugs when eventBase destructs,
293 // prevent calling runInLoop any more after the test is finished.
294 // 55 == number of times loop should run.
295 if (messagesPerLoop.size() != 55) {
296 // Reschedule ourself to run at the end of the next loop
297 eventBase.runInLoop(loopFinished);
298 }
299 };
300 // Schedule the first call to loopFinished
301 eventBase.runInLoop(loopFinished);
302
303 QueueConsumer consumer;
304 // Read the first 50 messages 10 at a time.
305 consumer.setMaxReadAtOnce(10);
306 consumer.fn = [&](int value) {
307 ++messagesThisLoop;
308 // After 50 messages, drop to reading only 1 message at a time.
309 if (value == 50) {
310 consumer.setMaxReadAtOnce(1);
311 }
312 // Terminate the loop when we reach the end of the messages.
313 if (value == 99) {
314 eventBase.terminateLoopSoon();
315 }
316 };
317 consumer.startConsuming(&eventBase, &queue);
318
319 // Run the event loop until the consumer terminates it
320 eventBase.loop();
321
322 // The consumer should have read all 100 messages in order
323 EXPECT_EQ(100, consumer.messages.size());
324 for (int n = 0; n < 100; ++n) {
325 EXPECT_EQ(n, consumer.messages.at(n));
326 }
327
328 // Currently EventBase happens to still run the loop callbacks even after
329 // terminateLoopSoon() is called. However, we don't really want to depend on
330 // this behavior. In case this ever changes in the future, add
331 // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
332 // last loop iteration.
333 if (messagesThisLoop > 0) {
334 messagesPerLoop.push_back(messagesThisLoop);
335 messagesThisLoop = 0;
336 }
337
338 // For the first 5 loops it should have read 10 messages each time.
339 // After that it should have read 1 messages per loop for the next 50 loops.
340 EXPECT_EQ(55, messagesPerLoop.size());
341 for (int n = 0; n < 5; ++n) {
342 EXPECT_EQ(10, messagesPerLoop.at(n));
343 }
344 for (int n = 5; n < 55; ++n) {
345 EXPECT_EQ(1, messagesPerLoop.at(n));
346 }
347}
348
349void QueueTest::destroyCallback() {
350 // Rather than using QueueConsumer, define a separate class for the destroy
351 // test. The DestroyTestConsumer will delete itself inside the
352 // messageAvailable() callback. With a regular QueueConsumer this would
353 // destroy the std::function object while the function is running, which we
354 // should probably avoid doing. This uses a pointer to a std::function to
355 // avoid destroying the function object.
356 class DestroyTestConsumer : public IntQueue::Consumer {
357 public:
358 void messageAvailable(int&& value) noexcept override {
359 DestructorGuard g(this);
360 if (fn && *fn) {
361 (*fn)(value);
362 }
363 }
364
365 std::function<void(int)>* fn;
366
367 protected:
368 ~DestroyTestConsumer() override = default;
369 };
370
371 EventBase eventBase;
372 // Create a queue and add 2 messages to it
373 queue.putMessage(1);
374 queue.putMessage(2);
375
376 // Create two QueueConsumers allocated on the heap.
377 // Have whichever one gets called first destroy both of the QueueConsumers.
378 // This way one consumer will be destroyed from inside its messageAvailable()
379 // callback, and one consume will be destroyed when it isn't inside
380 // messageAvailable().
381 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
382 consumer1(new DestroyTestConsumer);
383 std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
384 consumer2(new DestroyTestConsumer);
385 std::function<void(int)> fn = [&](int) {
386 consumer1 = nullptr;
387 consumer2 = nullptr;
388 };
389 consumer1->fn = &fn;
390 consumer2->fn = &fn;
391
392 consumer1->startConsuming(&eventBase, &queue);
393 consumer2->startConsuming(&eventBase, &queue);
394
395 // Run the event loop.
396 eventBase.loop();
397
398 // One of the consumers should have fired, received the message,
399 // then destroyed both consumers.
400 EXPECT_TRUE(!consumer1);
401 EXPECT_TRUE(!consumer2);
402 // One message should be left in the queue
403 int result = 1;
404 EXPECT_TRUE(queue.tryConsume(result));
405 EXPECT_EQ(2, result);
406}
407
408TEST(NotificationQueueTest, ConsumeUntilDrained) {
409 // Basic tests: make sure we
410 // - drain all the messages
411 // - ignore any maxReadAtOnce
412 // - can't add messages during draining
413 EventBase eventBase;
414 IntQueue queue;
415 QueueConsumer consumer;
416 consumer.fn = [&](int i) {
417 EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
418 EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
419 EXPECT_THROW(queue.putMessage(i), std::runtime_error);
420 std::vector<int> ints{1, 2, 3};
421 EXPECT_THROW(
422 queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
423 };
424 consumer.setMaxReadAtOnce(10); // We should ignore this
425 consumer.startConsuming(&eventBase, &queue);
426 for (int i = 0; i < 20; i++) {
427 queue.putMessage(i);
428 }
429 EXPECT_TRUE(consumer.consumeUntilDrained());
430 EXPECT_EQ(20, consumer.messages.size());
431
432 // Make sure there can only be one drainer at once
433 folly::Baton<> callbackBaton, threadStartBaton;
434 consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
435 QueueConsumer competingConsumer;
436 competingConsumer.startConsuming(&eventBase, &queue);
437 queue.putMessage(1);
438 atomic<bool> raceA{false};
439 atomic<bool> raceB{false};
440 size_t numConsA = 0;
441 size_t numConsB = 0;
442 auto thread = std::thread([&] {
443 threadStartBaton.post();
444 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
445 });
446 threadStartBaton.wait();
447 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
448 callbackBaton.post();
449 thread.join();
450 EXPECT_FALSE(raceA && raceB);
451 EXPECT_TRUE(raceA || raceB);
452 EXPECT_TRUE(raceA ^ raceB);
453}
454
455TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
456 for (size_t i = 0; i < 1 << 8; ++i) {
457 // Basic tests: make sure we
458 // - drain all the messages
459 // - ignore any maxReadAtOnce
460 // - can't add messages during draining
461 EventBase eventBase;
462 IntQueue queue;
463 QueueConsumer consumer;
464 consumer.fn = [&](int j) {
465 EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
466 EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
467 EXPECT_THROW(queue.putMessage(j), std::runtime_error);
468 std::vector<int> ints{1, 2, 3};
469 EXPECT_THROW(
470 queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
471 };
472 consumer.setMaxReadAtOnce(10); // We should ignore this
473 consumer.startConsuming(&eventBase, &queue);
474 for (int j = 0; j < 20; j++) {
475 queue.putMessage(j);
476 }
477 EXPECT_TRUE(consumer.consumeUntilDrained());
478 EXPECT_EQ(20, consumer.messages.size());
479
480 // Make sure there can only be one drainer at once
481 folly::Baton<> callbackBaton, threadStartBaton;
482 consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
483 QueueConsumer competingConsumer;
484 competingConsumer.startConsuming(&eventBase, &queue);
485 queue.putMessage(1);
486 atomic<bool> raceA{false};
487 atomic<bool> raceB{false};
488 size_t numConsA = 0;
489 size_t numConsB = 0;
490 auto thread = std::thread([&] {
491 threadStartBaton.post();
492 raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
493 });
494 threadStartBaton.wait();
495 raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
496 callbackBaton.post();
497 thread.join();
498 EXPECT_FALSE(raceA && raceB);
499 EXPECT_TRUE(raceA || raceB);
500 EXPECT_TRUE(raceA ^ raceB);
501 }
502}
503
504#ifdef FOLLY_HAVE_EVENTFD
505TEST(NotificationQueueTest, SendOneEventFD) {
506 QueueTest qt(0, IntQueue::FdType::EVENTFD);
507 qt.sendOne();
508}
509
510TEST(NotificationQueueTest, PutMessagesEventFD) {
511 QueueTest qt(0, IntQueue::FdType::EVENTFD);
512 qt.sendOne();
513}
514
515TEST(NotificationQueueTest, MultiConsumerEventFD) {
516 QueueTest qt(0, IntQueue::FdType::EVENTFD);
517 qt.multiConsumer();
518}
519
520TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
521 QueueTest qt(5, IntQueue::FdType::EVENTFD);
522 qt.maxQueueSize();
523}
524
525TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
526 QueueTest qt(0, IntQueue::FdType::EVENTFD);
527 qt.maxReadAtOnce();
528}
529
530TEST(NotificationQueueTest, DestroyCallbackEventFD) {
531 QueueTest qt(0, IntQueue::FdType::EVENTFD);
532 qt.destroyCallback();
533}
534#endif
535
536TEST(NotificationQueueTest, SendOnePipe) {
537 QueueTest qt(0, IntQueue::FdType::PIPE);
538 qt.sendOne();
539}
540
541TEST(NotificationQueueTest, PutMessagesPipe) {
542 QueueTest qt(0, IntQueue::FdType::PIPE);
543 qt.sendOne();
544}
545
546TEST(NotificationQueueTest, MultiConsumerPipe) {
547 QueueTest qt(0, IntQueue::FdType::PIPE);
548 qt.multiConsumer();
549}
550
551TEST(NotificationQueueTest, MaxQueueSizePipe) {
552 QueueTest qt(5, IntQueue::FdType::PIPE);
553 qt.maxQueueSize();
554}
555
556TEST(NotificationQueueTest, MaxReadAtOncePipe) {
557 QueueTest qt(0, IntQueue::FdType::PIPE);
558 qt.maxReadAtOnce();
559}
560
561TEST(NotificationQueueTest, DestroyCallbackPipe) {
562 QueueTest qt(0, IntQueue::FdType::PIPE);
563 qt.destroyCallback();
564}
565
566#ifndef _WIN32
567/*
568 * Test code that creates a NotificationQueue, then forks, and incorrectly
569 * tries to send a message to the queue from the child process.
570 *
571 * The child process should crash in this scenario, since the child code has a
572 * bug. (Older versions of NotificationQueue didn't catch this in the child,
573 * resulting in a crash in the parent process.)
574 */
575TEST(NotificationQueueTest, UseAfterFork) {
576 IntQueue queue;
577 int childStatus = 0;
578 QueueConsumer consumer;
579
580 // Boost sets a custom SIGCHLD handler, which fails the test if a child
581 // process exits abnormally. We don't want this.
582 signal(SIGCHLD, SIG_DFL);
583
584 // Log some info so users reading the test output aren't confused
585 // by the child process' crash log messages.
586 LOG(INFO) << "This test makes sure the child process crashes. "
587 << "Error log messagges and a backtrace are expected.";
588
589 {
590 // Start a separate thread consuming from the queue
591 ScopedEventBaseThread t1;
592 t1.getEventBase()->runInEventBaseThread(
593 [&] { consumer.startConsuming(t1.getEventBase(), &queue); });
594
595 // Send a message to it, just for sanity checking
596 queue.putMessage(1234);
597
598 // Fork
599 pid_t pid = fork();
600 if (pid == 0) {
601 // The boost test framework installs signal handlers to catch errors.
602 // We only want to catch in the parent. In the child let SIGABRT crash
603 // us normally.
604 signal(SIGABRT, SIG_DFL);
605
606 // Child.
607 // We're horrible people, so we try to send a message to the queue
608 // that is being consumed in the parent process.
609 //
610 // The putMessage() call should catch this error, and crash our process.
611 queue.putMessage(9876);
612 // We shouldn't reach here.
613 _exit(0);
614 }
615 PCHECK(pid > 0);
616
617 // Parent. Wait for the child to exit.
618 auto waited = waitpid(pid, &childStatus, 0);
619 EXPECT_EQ(pid, waited);
620
621 // Send another message to the queue before we terminate the thread.
622 queue.putMessage(5678);
623 }
624
625 // The child process should have crashed when it tried to call putMessage()
626 // on our NotificationQueue.
627 EXPECT_TRUE(WIFSIGNALED(childStatus));
628 EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
629
630 // Make sure the parent saw the expected messages.
631 // It should have gotten 1234 and 5678 from the parent process, but not
632 // 9876 from the child.
633 EXPECT_EQ(2, consumer.messages.size());
634 EXPECT_EQ(1234, consumer.messages.front());
635 consumer.messages.pop_front();
636 EXPECT_EQ(5678, consumer.messages.front());
637 consumer.messages.pop_front();
638}
639#endif
640
641TEST(NotificationQueueConsumer, make) {
642 int value = 0;
643 EventBase evb;
644 NotificationQueue<int> queue(32);
645
646 auto consumer =
647 decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });
648
649 consumer->startConsuming(&evb, &queue);
650
651 int const newValue = 10;
652 queue.tryPutMessage(newValue);
653
654 evb.loopOnce();
655
656 EXPECT_EQ(newValue, value);
657}
658