1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <sys/types.h>
20
21#include <algorithm>
22#include <iterator>
23#include <memory>
24#include <stdexcept>
25#include <utility>
26
27#include <boost/intrusive/slist.hpp>
28#include <folly/Exception.h>
29#include <folly/FileUtil.h>
30#include <folly/Likely.h>
31#include <folly/ScopeGuard.h>
32#include <folly/SpinLock.h>
33#include <folly/io/async/DelayedDestruction.h>
34#include <folly/io/async/EventBase.h>
35#include <folly/io/async/EventHandler.h>
36#include <folly/io/async/Request.h>
37#include <folly/portability/Fcntl.h>
38#include <folly/portability/Sockets.h>
39#include <folly/portability/Unistd.h>
40
41#include <glog/logging.h>
42
43#if __linux__ && !__ANDROID__
44#define FOLLY_HAVE_EVENTFD
45#include <folly/io/async/EventFDWrapper.h>
46#endif
47
48namespace folly {
49
50/**
51 * A producer-consumer queue for passing messages between EventBase threads.
52 *
53 * Messages can be added to the queue from any thread. Multiple consumers may
54 * listen to the queue from multiple EventBase threads.
55 *
56 * A NotificationQueue may not be destroyed while there are still consumers
57 * registered to receive events from the queue. It is the user's
58 * responsibility to ensure that all consumers are unregistered before the
59 * queue is destroyed.
60 *
61 * MessageT should be MoveConstructible (i.e., must support either a move
62 * constructor or a copy constructor, or both). Ideally it's move constructor
63 * (or copy constructor if no move constructor is provided) should never throw
64 * exceptions. If the constructor may throw, the consumers could end up
65 * spinning trying to move a message off the queue and failing, and then
66 * retrying.
67 */
68template <typename MessageT>
69class NotificationQueue {
70 struct Node : public boost::intrusive::slist_base_hook<
71 boost::intrusive::cache_last<true>> {
72 template <typename MessageTT>
73 Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx)
74 : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {}
75 MessageT msg_;
76 std::shared_ptr<RequestContext> ctx_;
77 };
78
79 public:
80 /**
81 * A callback interface for consuming messages from the queue as they arrive.
82 */
83 class Consumer : public DelayedDestruction, private EventHandler {
84 public:
85 enum : uint16_t { kDefaultMaxReadAtOnce = 10 };
86
87 Consumer()
88 : queue_(nullptr),
89 destroyedFlagPtr_(nullptr),
90 maxReadAtOnce_(kDefaultMaxReadAtOnce) {}
91
92 // create a consumer in-place, without the need to build new class
93 template <typename TCallback>
94 static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make(
95 TCallback&& callback);
96
97 /**
98 * messageAvailable() will be invoked whenever a new
99 * message is available from the pipe.
100 */
101 virtual void messageAvailable(MessageT&& message) noexcept = 0;
102
103 /**
104 * Begin consuming messages from the specified queue.
105 *
106 * messageAvailable() will be called whenever a message is available. This
107 * consumer will continue to consume messages until stopConsuming() is
108 * called.
109 *
110 * A Consumer may only consume messages from a single NotificationQueue at
111 * a time. startConsuming() should not be called if this consumer is
112 * already consuming.
113 */
114 void startConsuming(EventBase* eventBase, NotificationQueue* queue) {
115 init(eventBase, queue);
116 registerHandler(READ | PERSIST);
117 }
118
119 /**
120 * Same as above but registers this event handler as internal so that it
121 * doesn't count towards the pending reader count for the IOLoop.
122 */
123 void startConsumingInternal(
124 EventBase* eventBase,
125 NotificationQueue* queue) {
126 init(eventBase, queue);
127 registerInternalHandler(READ | PERSIST);
128 }
129
130 /**
131 * Stop consuming messages.
132 *
133 * startConsuming() may be called again to resume consumption of messages
134 * at a later point in time.
135 */
136 void stopConsuming();
137
138 /**
139 * Consume messages off the queue until it is empty. No messages may be
140 * added to the queue while it is draining, so that the process is bounded.
141 * To that end, putMessage/tryPutMessage will throw an std::runtime_error,
142 * and tryPutMessageNoThrow will return false.
143 *
144 * @returns true if the queue was drained, false otherwise. In practice,
145 * this will only fail if someone else is already draining the queue.
146 */
147 bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept;
148
149 /**
150 * Get the NotificationQueue that this consumer is currently consuming
151 * messages from. Returns nullptr if the consumer is not currently
152 * consuming events from any queue.
153 */
154 NotificationQueue* getCurrentQueue() const {
155 return queue_;
156 }
157
158 /**
159 * Set a limit on how many messages this consumer will read each iteration
160 * around the event loop.
161 *
162 * This helps rate-limit how much work the Consumer will do each event loop
163 * iteration, to prevent it from starving other event handlers.
164 *
165 * A limit of 0 means no limit will be enforced. If unset, the limit
166 * defaults to kDefaultMaxReadAtOnce (defined to 10 above).
167 */
168 void setMaxReadAtOnce(uint32_t maxAtOnce) {
169 maxReadAtOnce_ = maxAtOnce;
170 }
171 uint32_t getMaxReadAtOnce() const {
172 return maxReadAtOnce_;
173 }
174
175 EventBase* getEventBase() {
176 return base_;
177 }
178
179 void handlerReady(uint16_t events) noexcept override;
180
181 protected:
182 void destroy() override;
183
184 ~Consumer() override {}
185
186 private:
187 /**
188 * Consume messages off the the queue until
189 * - the queue is empty (1), or
190 * - until the consumer is destroyed, or
191 * - until the consumer is uninstalled, or
192 * - an exception is thrown in the course of dequeueing, or
193 * - unless isDrain is true, until the maxReadAtOnce_ limit is hit
194 *
195 * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation.
196 */
197 void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept;
198
199 void setActive(bool active, bool shouldLock = false) {
200 if (!queue_) {
201 active_ = active;
202 return;
203 }
204 if (shouldLock) {
205 queue_->spinlock_.lock();
206 }
207 if (!active_ && active) {
208 ++queue_->numActiveConsumers_;
209 } else if (active_ && !active) {
210 --queue_->numActiveConsumers_;
211 }
212 active_ = active;
213 if (shouldLock) {
214 queue_->spinlock_.unlock();
215 }
216 }
217 void init(EventBase* eventBase, NotificationQueue* queue);
218
219 NotificationQueue* queue_;
220 bool* destroyedFlagPtr_;
221 uint32_t maxReadAtOnce_;
222 EventBase* base_;
223 bool active_{false};
224 };
225
226 class SimpleConsumer {
227 public:
228 explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) {
229 ++queue_.numConsumers_;
230 }
231
232 ~SimpleConsumer() {
233 --queue_.numConsumers_;
234 }
235
236 int getFd() const {
237 return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0];
238 }
239
240 template <typename F>
241 void consumeUntilDrained(F&& foreach);
242
243 private:
244 NotificationQueue& queue_;
245 };
246
247 enum class FdType {
248 PIPE,
249#ifdef FOLLY_HAVE_EVENTFD
250 EVENTFD,
251#endif
252 };
253
254 /**
255 * Create a new NotificationQueue.
256 *
257 * If the maxSize parameter is specified, this sets the maximum queue size
258 * that will be enforced by tryPutMessage(). (This size is advisory, and may
259 * be exceeded if producers explicitly use putMessage() instead of
260 * tryPutMessage().)
261 *
262 * The fdType parameter determines the type of file descriptor used
263 * internally to signal message availability. The default (eventfd) is
264 * preferable for performance and because it won't fail when the queue gets
265 * too long. It is not available on on older and non-linux kernels, however.
266 * In this case the code will fall back to using a pipe, the parameter is
267 * mostly for testing purposes.
268 */
269 explicit NotificationQueue(
270 uint32_t maxSize = 0,
271#ifdef FOLLY_HAVE_EVENTFD
272 FdType fdType = FdType::EVENTFD)
273#else
274 FdType fdType = FdType::PIPE)
275#endif
276 : eventfd_(-1),
277 pipeFds_{-1, -1},
278 advisoryMaxQueueSize_(maxSize),
279 pid_(pid_t(getpid())) {
280
281#ifdef FOLLY_HAVE_EVENTFD
282 if (fdType == FdType::EVENTFD) {
283 eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
284 if (eventfd_ == -1) {
285 if (errno == ENOSYS || errno == EINVAL) {
286 // eventfd not availalble
287 LOG(ERROR) << "failed to create eventfd for NotificationQueue: "
288 << errno << ", falling back to pipe mode (is your kernel "
289 << "> 2.6.30?)";
290 fdType = FdType::PIPE;
291 } else {
292 // some other error
293 folly::throwSystemError(
294 "Failed to create eventfd for "
295 "NotificationQueue",
296 errno);
297 }
298 }
299 }
300#endif
301 if (fdType == FdType::PIPE) {
302 if (pipe(pipeFds_)) {
303 folly::throwSystemError(
304 "Failed to create pipe for NotificationQueue", errno);
305 }
306 try {
307 // put both ends of the pipe into non-blocking mode
308 if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) {
309 folly::throwSystemError(
310 "failed to put NotificationQueue pipe read "
311 "endpoint into non-blocking mode",
312 errno);
313 }
314 if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) {
315 folly::throwSystemError(
316 "failed to put NotificationQueue pipe write "
317 "endpoint into non-blocking mode",
318 errno);
319 }
320 } catch (...) {
321 ::close(pipeFds_[0]);
322 ::close(pipeFds_[1]);
323 throw;
324 }
325 }
326 }
327
328 ~NotificationQueue() {
329 std::unique_ptr<Node> data;
330 while (!queue_.empty()) {
331 data.reset(&queue_.front());
332 queue_.pop_front();
333 }
334 if (eventfd_ >= 0) {
335 ::close(eventfd_);
336 eventfd_ = -1;
337 }
338 if (pipeFds_[0] >= 0) {
339 ::close(pipeFds_[0]);
340 pipeFds_[0] = -1;
341 }
342 if (pipeFds_[1] >= 0) {
343 ::close(pipeFds_[1]);
344 pipeFds_[1] = -1;
345 }
346 }
347
348 /**
349 * Set the advisory maximum queue size.
350 *
351 * This maximum queue size affects calls to tryPutMessage(). Message
352 * producers can still use the putMessage() call to unconditionally put a
353 * message on the queue, ignoring the configured maximum queue size. This
354 * can cause the queue size to exceed the configured maximum.
355 */
356 void setMaxQueueSize(uint32_t max) {
357 advisoryMaxQueueSize_ = max;
358 }
359
360 /**
361 * Attempt to put a message on the queue if the queue is not already full.
362 *
363 * If the queue is full, a std::overflow_error will be thrown. The
364 * setMaxQueueSize() function controls the maximum queue size.
365 *
366 * If the queue is currently draining, an std::runtime_error will be thrown.
367 *
368 * This method may contend briefly on a spinlock if many threads are
369 * concurrently accessing the queue, but for all intents and purposes it will
370 * immediately place the message on the queue and return.
371 *
372 * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and
373 * may throw any other exception thrown by the MessageT move/copy
374 * constructor.
375 */
376 template <typename MessageTT>
377 void tryPutMessage(MessageTT&& message) {
378 putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_);
379 }
380
381 /**
382 * No-throw versions of the above. Instead returns true on success, false on
383 * failure.
384 *
385 * Only std::overflow_error (the common exception case) and std::runtime_error
386 * (which indicates that the queue is being drained) are prevented from being
387 * thrown. User code must still catch std::bad_alloc errors.
388 */
389 template <typename MessageTT>
390 bool tryPutMessageNoThrow(MessageTT&& message) {
391 return putMessageImpl(
392 std::forward<MessageTT>(message), advisoryMaxQueueSize_, false);
393 }
394
395 /**
396 * Unconditionally put a message on the queue.
397 *
398 * This method is like tryPutMessage(), but ignores the maximum queue size
399 * and always puts the message on the queue, even if the maximum queue size
400 * would be exceeded.
401 *
402 * putMessage() may throw
403 * - std::bad_alloc if memory allocation fails, and may
404 * - std::runtime_error if the queue is currently draining
405 * - any other exception thrown by the MessageT move/copy constructor.
406 */
407 template <typename MessageTT>
408 void putMessage(MessageTT&& message) {
409 putMessageImpl(std::forward<MessageTT>(message), 0);
410 }
411
412 /**
413 * Put several messages on the queue.
414 */
415 template <typename InputIteratorT>
416 void putMessages(InputIteratorT first, InputIteratorT last) {
417 typedef typename std::iterator_traits<InputIteratorT>::iterator_category
418 IterCategory;
419 putMessagesImpl(first, last, IterCategory());
420 }
421
422 /**
423 * Try to immediately pull a message off of the queue, without blocking.
424 *
425 * If a message is immediately available, the result parameter will be
426 * updated to contain the message contents and true will be returned.
427 *
428 * If no message is available, false will be returned and result will be left
429 * unmodified.
430 */
431 bool tryConsume(MessageT& result) {
432 SCOPE_EXIT {
433 syncSignalAndQueue();
434 };
435
436 checkPid();
437 std::unique_ptr<Node> data;
438
439 {
440 folly::SpinLockGuard g(spinlock_);
441
442 if (UNLIKELY(queue_.empty())) {
443 return false;
444 }
445
446 data.reset(&queue_.front());
447 queue_.pop_front();
448 }
449
450 result = std::move(data->msg_);
451 RequestContext::setContext(std::move(data->ctx_));
452
453 return true;
454 }
455
456 size_t size() const {
457 folly::SpinLockGuard g(spinlock_);
458 return queue_.size();
459 }
460
461 /**
462 * Check that the NotificationQueue is being used from the correct process.
463 *
464 * If you create a NotificationQueue in one process, then fork, and try to
465 * send messages to the queue from the child process, you're going to have a
466 * bad time. Unfortunately users have (accidentally) run into this.
467 *
468 * Because we use an eventfd/pipe, the child process can actually signal the
469 * parent process that an event is ready. However, it can't put anything on
470 * the parent's queue, so the parent wakes up and finds an empty queue. This
471 * check ensures that we catch the problem in the misbehaving child process
472 * code, and crash before signalling the parent process.
473 */
474 void checkPid() const {
475 CHECK_EQ(pid_, pid_t(getpid()));
476 }
477
478 private:
479 // Forbidden copy constructor and assignment operator
480 NotificationQueue(NotificationQueue const&) = delete;
481 NotificationQueue& operator=(NotificationQueue const&) = delete;
482
483 inline bool checkQueueSize(size_t maxSize, bool throws = true) const {
484 DCHECK(0 == spinlock_.try_lock());
485 if (maxSize > 0 && queue_.size() >= maxSize) {
486 if (throws) {
487 throw std::overflow_error(
488 "unable to add message to NotificationQueue: "
489 "queue is full");
490 }
491 return false;
492 }
493 return true;
494 }
495
496 inline bool checkDraining(bool throws = true) {
497 if (UNLIKELY(draining_ && throws)) {
498 throw std::runtime_error("queue is draining, cannot add message");
499 }
500 return draining_;
501 }
502
503#ifdef __ANDROID__
504 // TODO 10860938 Remove after figuring out crash
505 mutable std::atomic<int> eventBytes_{0};
506 mutable std::atomic<int> maxEventBytes_{0};
507#endif
508
509 void ensureSignalLocked() const {
510 // semantics: empty fd == empty queue <=> !signal_
511 if (signal_) {
512 return;
513 }
514
515 ssize_t bytes_written = 0;
516 size_t bytes_expected = 0;
517
518 do {
519 if (eventfd_ >= 0) {
520 // eventfd(2) dictates that we must write a 64-bit integer
521 uint64_t signal = 1;
522 bytes_expected = sizeof(signal);
523 bytes_written = ::write(eventfd_, &signal, bytes_expected);
524 } else {
525 uint8_t signal = 1;
526 bytes_expected = sizeof(signal);
527 bytes_written = ::write(pipeFds_[1], &signal, bytes_expected);
528 }
529 } while (bytes_written == -1 && errno == EINTR);
530
531#ifdef __ANDROID__
532 if (bytes_written > 0) {
533 eventBytes_ += bytes_written;
534 maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_);
535 }
536#endif
537
538 if (bytes_written == ssize_t(bytes_expected)) {
539 signal_ = true;
540 } else {
541#ifdef __ANDROID__
542 LOG(ERROR) << "NotificationQueue Write Error=" << errno
543 << " bytesInPipe=" << eventBytes_
544 << " maxInPipe=" << maxEventBytes_ << " queue=" << size();
545#endif
546 folly::throwSystemError(
547 "failed to signal NotificationQueue after "
548 "write",
549 errno);
550 }
551 }
552
553 void drainSignalsLocked() {
554 ssize_t bytes_read = 0;
555 if (eventfd_ > 0) {
556 uint64_t message;
557 bytes_read = readNoInt(eventfd_, &message, sizeof(message));
558 CHECK(bytes_read != -1 || errno == EAGAIN);
559 } else {
560 // There should only be one byte in the pipe. To avoid potential leaks we
561 // still drain.
562 uint8_t message[32];
563 ssize_t result;
564 while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) !=
565 -1) {
566 bytes_read += result;
567 }
568 CHECK(result == -1 && errno == EAGAIN);
569 LOG_IF(ERROR, bytes_read > 1)
570 << "[NotificationQueue] Unexpected state while draining pipe: bytes_read="
571 << bytes_read << " bytes, expected <= 1";
572 }
573 LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0))
574 << "[NotificationQueue] Unexpected state while draining signals: signal_="
575 << signal_ << " bytes_read=" << bytes_read;
576
577 signal_ = false;
578
579#ifdef __ANDROID__
580 if (bytes_read > 0) {
581 eventBytes_ -= bytes_read;
582 }
583#endif
584 }
585
586 void ensureSignal() const {
587 folly::SpinLockGuard g(spinlock_);
588 ensureSignalLocked();
589 }
590
591 void syncSignalAndQueue() {
592 folly::SpinLockGuard g(spinlock_);
593
594 if (queue_.empty()) {
595 drainSignalsLocked();
596 } else {
597 ensureSignalLocked();
598 }
599 }
600
601 template <typename MessageTT>
602 bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) {
603 checkPid();
604 bool signal = false;
605 {
606 auto data = std::make_unique<Node>(
607 std::forward<MessageTT>(message), RequestContext::saveContext());
608 folly::SpinLockGuard g(spinlock_);
609 if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) {
610 return false;
611 }
612 // We only need to signal an event if not all consumers are
613 // awake.
614 if (numActiveConsumers_ < numConsumers_) {
615 signal = true;
616 }
617 queue_.push_back(*data.release());
618 if (signal) {
619 ensureSignalLocked();
620 }
621 }
622 return true;
623 }
624
625 template <typename InputIteratorT>
626 void putMessagesImpl(
627 InputIteratorT first,
628 InputIteratorT last,
629 std::input_iterator_tag) {
630 checkPid();
631 bool signal = false;
632 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q;
633 try {
634 while (first != last) {
635 auto data = std::make_unique<Node>(
636 std::move(*first), RequestContext::saveContext());
637 q.push_back(*data.release());
638 ++first;
639 }
640 folly::SpinLockGuard g(spinlock_);
641 checkDraining();
642 queue_.splice(queue_.end(), q);
643 if (numActiveConsumers_ < numConsumers_) {
644 signal = true;
645 }
646 if (signal) {
647 ensureSignalLocked();
648 }
649 } catch (...) {
650 std::unique_ptr<Node> data;
651 while (!q.empty()) {
652 data.reset(&q.front());
653 q.pop_front();
654 }
655 throw;
656 }
657 }
658
659 mutable folly::SpinLock spinlock_;
660 mutable bool signal_{false};
661 int eventfd_;
662 int pipeFds_[2]; // to fallback to on older/non-linux systems
663 uint32_t advisoryMaxQueueSize_;
664 pid_t pid_;
665 boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_;
666 int numConsumers_{0};
667 std::atomic<int> numActiveConsumers_{0};
668 bool draining_{false};
669};
670
671template <typename MessageT>
672void NotificationQueue<MessageT>::Consumer::destroy() {
673 // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_
674 // will be non-nullptr. Mark the value that it points to, so that
675 // handlerReady() will know the callback is destroyed, and that it cannot
676 // access any member variables anymore.
677 if (destroyedFlagPtr_) {
678 *destroyedFlagPtr_ = true;
679 }
680 stopConsuming();
681 DelayedDestruction::destroy();
682}
683
684template <typename MessageT>
685void NotificationQueue<MessageT>::Consumer::handlerReady(
686 uint16_t /*events*/) noexcept {
687 consumeMessages(false);
688}
689
690template <typename MessageT>
691void NotificationQueue<MessageT>::Consumer::consumeMessages(
692 bool isDrain,
693 size_t* numConsumed) noexcept {
694 DestructorGuard dg(this);
695 uint32_t numProcessed = 0;
696 setActive(true);
697 SCOPE_EXIT {
698 if (queue_) {
699 queue_->syncSignalAndQueue();
700 }
701 };
702 SCOPE_EXIT {
703 setActive(false, /* shouldLock = */ true);
704 };
705 SCOPE_EXIT {
706 if (numConsumed != nullptr) {
707 *numConsumed = numProcessed;
708 }
709 };
710 while (true) {
711 // Now pop the message off of the queue.
712 //
713 // We have to manually acquire and release the spinlock here, rather than
714 // using SpinLockHolder since the MessageT has to be constructed while
715 // holding the spinlock and available after we release it. SpinLockHolder
716 // unfortunately doesn't provide a release() method. (We can't construct
717 // MessageT first since we have no guarantee that MessageT has a default
718 // constructor.
719 queue_->spinlock_.lock();
720 bool locked = true;
721
722 try {
723 if (UNLIKELY(queue_->queue_.empty())) {
724 // If there is no message, we've reached the end of the queue, return.
725 setActive(false);
726 queue_->spinlock_.unlock();
727 return;
728 }
729
730 // Pull a message off the queue.
731 std::unique_ptr<Node> data;
732 data.reset(&queue_->queue_.front());
733 queue_->queue_.pop_front();
734
735 // Check to see if the queue is empty now.
736 // We use this as an optimization to see if we should bother trying to
737 // loop again and read another message after invoking this callback.
738 bool wasEmpty = queue_->queue_.empty();
739 if (wasEmpty) {
740 setActive(false);
741 }
742
743 // Now unlock the spinlock before we invoke the callback.
744 queue_->spinlock_.unlock();
745 RequestContextScopeGuard rctx(std::move(data->ctx_));
746
747 locked = false;
748
749 // Call the callback
750 bool callbackDestroyed = false;
751 CHECK(destroyedFlagPtr_ == nullptr);
752 destroyedFlagPtr_ = &callbackDestroyed;
753 messageAvailable(std::move(data->msg_));
754 destroyedFlagPtr_ = nullptr;
755
756 // If the callback was destroyed before it returned, we are done
757 if (callbackDestroyed) {
758 return;
759 }
760
761 // If the callback is no longer installed, we are done.
762 if (queue_ == nullptr) {
763 return;
764 }
765
766 // If we have hit maxReadAtOnce_, we are done.
767 ++numProcessed;
768 if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) {
769 return;
770 }
771
772 // If the queue was empty before we invoked the callback, it's probable
773 // that it is still empty now. Just go ahead and return, rather than
774 // looping again and trying to re-read from the eventfd. (If a new
775 // message had in fact arrived while we were invoking the callback, we
776 // will simply be woken up the next time around the event loop and will
777 // process the message then.)
778 if (wasEmpty) {
779 return;
780 }
781 } catch (const std::exception&) {
782 // This catch block is really just to handle the case where the MessageT
783 // constructor throws. The messageAvailable() callback itself is
784 // declared as noexcept and should never throw.
785 //
786 // If the MessageT constructor does throw we try to handle it as best as
787 // we can, but we can't work miracles. We will just ignore the error for
788 // now and return. The next time around the event loop we will end up
789 // trying to read the message again. If MessageT continues to throw we
790 // will never make forward progress and will keep trying each time around
791 // the event loop.
792 if (locked) {
793 // Unlock the spinlock.
794 queue_->spinlock_.unlock();
795 }
796
797 return;
798 }
799 }
800}
801
802template <typename MessageT>
803void NotificationQueue<MessageT>::Consumer::init(
804 EventBase* eventBase,
805 NotificationQueue* queue) {
806 eventBase->dcheckIsInEventBaseThread();
807 assert(queue_ == nullptr);
808 assert(!isHandlerRegistered());
809 queue->checkPid();
810
811 base_ = eventBase;
812
813 queue_ = queue;
814
815 {
816 folly::SpinLockGuard g(queue_->spinlock_);
817 queue_->numConsumers_++;
818 }
819 queue_->ensureSignal();
820
821 if (queue_->eventfd_ >= 0) {
822 initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_));
823 } else {
824 initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0]));
825 }
826}
827
828template <typename MessageT>
829void NotificationQueue<MessageT>::Consumer::stopConsuming() {
830 if (queue_ == nullptr) {
831 assert(!isHandlerRegistered());
832 return;
833 }
834
835 {
836 folly::SpinLockGuard g(queue_->spinlock_);
837 queue_->numConsumers_--;
838 setActive(false);
839 }
840
841 assert(isHandlerRegistered());
842 unregisterHandler();
843 detachEventBase();
844 queue_ = nullptr;
845}
846
847template <typename MessageT>
848bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained(
849 size_t* numConsumed) noexcept {
850 DestructorGuard dg(this);
851 {
852 folly::SpinLockGuard g(queue_->spinlock_);
853 if (queue_->draining_) {
854 return false;
855 }
856 queue_->draining_ = true;
857 }
858 consumeMessages(true, numConsumed);
859 {
860 folly::SpinLockGuard g(queue_->spinlock_);
861 queue_->draining_ = false;
862 }
863 return true;
864}
865
866template <typename MessageT>
867template <typename F>
868void NotificationQueue<MessageT>::SimpleConsumer::consumeUntilDrained(
869 F&& foreach) {
870 SCOPE_EXIT {
871 queue_.syncSignalAndQueue();
872 };
873
874 queue_.checkPid();
875
876 while (true) {
877 std::unique_ptr<Node> data;
878 {
879 folly::SpinLockGuard g(queue_.spinlock_);
880
881 if (UNLIKELY(queue_.queue_.empty())) {
882 return;
883 }
884
885 data.reset(&queue_.queue_.front());
886 queue_.queue_.pop_front();
887 }
888
889 RequestContextScopeGuard rctx(std::move(data->ctx_));
890 foreach(std::move(data->msg_));
891 // Make sure message destructor is called with the correct RequestContext.
892 data.reset();
893 }
894}
895
896/**
897 * Creates a NotificationQueue::Consumer wrapping a function object
898 * Modeled after AsyncTimeout::make
899 *
900 */
901
902namespace detail {
903
904template <typename MessageT, typename TCallback>
905struct notification_queue_consumer_wrapper
906 : public NotificationQueue<MessageT>::Consumer {
907 template <typename UCallback>
908 explicit notification_queue_consumer_wrapper(UCallback&& callback)
909 : callback_(std::forward<UCallback>(callback)) {}
910
911 // we are being stricter here and requiring noexcept for callback
912 void messageAvailable(MessageT&& message) noexcept override {
913 static_assert(
914 noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))),
915 "callback must be declared noexcept, e.g.: `[]() noexcept {}`");
916
917 callback_(std::forward<MessageT>(message));
918 }
919
920 private:
921 TCallback callback_;
922};
923
924} // namespace detail
925
926template <typename MessageT>
927template <typename TCallback>
928std::unique_ptr<
929 typename NotificationQueue<MessageT>::Consumer,
930 DelayedDestruction::Destructor>
931NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) {
932 return std::unique_ptr<
933 NotificationQueue<MessageT>::Consumer,
934 DelayedDestruction::Destructor>(
935 new detail::notification_queue_consumer_wrapper<
936 MessageT,
937 typename std::decay<TCallback>::type>(
938 std::forward<TCallback>(callback)));
939}
940
941} // namespace folly
942