1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <sys/types.h> |
20 | |
21 | #include <algorithm> |
22 | #include <iterator> |
23 | #include <memory> |
24 | #include <stdexcept> |
25 | #include <utility> |
26 | |
27 | #include <boost/intrusive/slist.hpp> |
28 | #include <folly/Exception.h> |
29 | #include <folly/FileUtil.h> |
30 | #include <folly/Likely.h> |
31 | #include <folly/ScopeGuard.h> |
32 | #include <folly/SpinLock.h> |
33 | #include <folly/io/async/DelayedDestruction.h> |
34 | #include <folly/io/async/EventBase.h> |
35 | #include <folly/io/async/EventHandler.h> |
36 | #include <folly/io/async/Request.h> |
37 | #include <folly/portability/Fcntl.h> |
38 | #include <folly/portability/Sockets.h> |
39 | #include <folly/portability/Unistd.h> |
40 | |
41 | #include <glog/logging.h> |
42 | |
43 | #if __linux__ && !__ANDROID__ |
44 | #define FOLLY_HAVE_EVENTFD |
45 | #include <folly/io/async/EventFDWrapper.h> |
46 | #endif |
47 | |
48 | namespace folly { |
49 | |
50 | /** |
51 | * A producer-consumer queue for passing messages between EventBase threads. |
52 | * |
53 | * Messages can be added to the queue from any thread. Multiple consumers may |
54 | * listen to the queue from multiple EventBase threads. |
55 | * |
56 | * A NotificationQueue may not be destroyed while there are still consumers |
57 | * registered to receive events from the queue. It is the user's |
58 | * responsibility to ensure that all consumers are unregistered before the |
59 | * queue is destroyed. |
60 | * |
61 | * MessageT should be MoveConstructible (i.e., must support either a move |
62 | * constructor or a copy constructor, or both). Ideally it's move constructor |
63 | * (or copy constructor if no move constructor is provided) should never throw |
64 | * exceptions. If the constructor may throw, the consumers could end up |
65 | * spinning trying to move a message off the queue and failing, and then |
66 | * retrying. |
67 | */ |
68 | template <typename MessageT> |
69 | class NotificationQueue { |
70 | struct Node : public boost::intrusive::slist_base_hook< |
71 | boost::intrusive::cache_last<true>> { |
72 | template <typename MessageTT> |
73 | Node(MessageTT&& msg, std::shared_ptr<RequestContext> ctx) |
74 | : msg_(std::forward<MessageTT>(msg)), ctx_(std::move(ctx)) {} |
75 | MessageT msg_; |
76 | std::shared_ptr<RequestContext> ctx_; |
77 | }; |
78 | |
79 | public: |
80 | /** |
81 | * A callback interface for consuming messages from the queue as they arrive. |
82 | */ |
83 | class Consumer : public DelayedDestruction, private EventHandler { |
84 | public: |
85 | enum : uint16_t { kDefaultMaxReadAtOnce = 10 }; |
86 | |
87 | Consumer() |
88 | : queue_(nullptr), |
89 | destroyedFlagPtr_(nullptr), |
90 | maxReadAtOnce_(kDefaultMaxReadAtOnce) {} |
91 | |
92 | // create a consumer in-place, without the need to build new class |
93 | template <typename TCallback> |
94 | static std::unique_ptr<Consumer, DelayedDestruction::Destructor> make( |
95 | TCallback&& callback); |
96 | |
97 | /** |
98 | * messageAvailable() will be invoked whenever a new |
99 | * message is available from the pipe. |
100 | */ |
101 | virtual void messageAvailable(MessageT&& message) noexcept = 0; |
102 | |
103 | /** |
104 | * Begin consuming messages from the specified queue. |
105 | * |
106 | * messageAvailable() will be called whenever a message is available. This |
107 | * consumer will continue to consume messages until stopConsuming() is |
108 | * called. |
109 | * |
110 | * A Consumer may only consume messages from a single NotificationQueue at |
111 | * a time. startConsuming() should not be called if this consumer is |
112 | * already consuming. |
113 | */ |
114 | void startConsuming(EventBase* eventBase, NotificationQueue* queue) { |
115 | init(eventBase, queue); |
116 | registerHandler(READ | PERSIST); |
117 | } |
118 | |
119 | /** |
120 | * Same as above but registers this event handler as internal so that it |
121 | * doesn't count towards the pending reader count for the IOLoop. |
122 | */ |
123 | void startConsumingInternal( |
124 | EventBase* eventBase, |
125 | NotificationQueue* queue) { |
126 | init(eventBase, queue); |
127 | registerInternalHandler(READ | PERSIST); |
128 | } |
129 | |
130 | /** |
131 | * Stop consuming messages. |
132 | * |
133 | * startConsuming() may be called again to resume consumption of messages |
134 | * at a later point in time. |
135 | */ |
136 | void stopConsuming(); |
137 | |
138 | /** |
139 | * Consume messages off the queue until it is empty. No messages may be |
140 | * added to the queue while it is draining, so that the process is bounded. |
141 | * To that end, putMessage/tryPutMessage will throw an std::runtime_error, |
142 | * and tryPutMessageNoThrow will return false. |
143 | * |
144 | * @returns true if the queue was drained, false otherwise. In practice, |
145 | * this will only fail if someone else is already draining the queue. |
146 | */ |
147 | bool consumeUntilDrained(size_t* numConsumed = nullptr) noexcept; |
148 | |
149 | /** |
150 | * Get the NotificationQueue that this consumer is currently consuming |
151 | * messages from. Returns nullptr if the consumer is not currently |
152 | * consuming events from any queue. |
153 | */ |
154 | NotificationQueue* getCurrentQueue() const { |
155 | return queue_; |
156 | } |
157 | |
158 | /** |
159 | * Set a limit on how many messages this consumer will read each iteration |
160 | * around the event loop. |
161 | * |
162 | * This helps rate-limit how much work the Consumer will do each event loop |
163 | * iteration, to prevent it from starving other event handlers. |
164 | * |
165 | * A limit of 0 means no limit will be enforced. If unset, the limit |
166 | * defaults to kDefaultMaxReadAtOnce (defined to 10 above). |
167 | */ |
168 | void setMaxReadAtOnce(uint32_t maxAtOnce) { |
169 | maxReadAtOnce_ = maxAtOnce; |
170 | } |
171 | uint32_t getMaxReadAtOnce() const { |
172 | return maxReadAtOnce_; |
173 | } |
174 | |
175 | EventBase* getEventBase() { |
176 | return base_; |
177 | } |
178 | |
179 | void handlerReady(uint16_t events) noexcept override; |
180 | |
181 | protected: |
182 | void destroy() override; |
183 | |
184 | ~Consumer() override {} |
185 | |
186 | private: |
187 | /** |
188 | * Consume messages off the the queue until |
189 | * - the queue is empty (1), or |
190 | * - until the consumer is destroyed, or |
191 | * - until the consumer is uninstalled, or |
192 | * - an exception is thrown in the course of dequeueing, or |
193 | * - unless isDrain is true, until the maxReadAtOnce_ limit is hit |
194 | * |
195 | * (1) Well, maybe. See logic/comments around "wasEmpty" in implementation. |
196 | */ |
197 | void consumeMessages(bool isDrain, size_t* numConsumed = nullptr) noexcept; |
198 | |
199 | void setActive(bool active, bool shouldLock = false) { |
200 | if (!queue_) { |
201 | active_ = active; |
202 | return; |
203 | } |
204 | if (shouldLock) { |
205 | queue_->spinlock_.lock(); |
206 | } |
207 | if (!active_ && active) { |
208 | ++queue_->numActiveConsumers_; |
209 | } else if (active_ && !active) { |
210 | --queue_->numActiveConsumers_; |
211 | } |
212 | active_ = active; |
213 | if (shouldLock) { |
214 | queue_->spinlock_.unlock(); |
215 | } |
216 | } |
217 | void init(EventBase* eventBase, NotificationQueue* queue); |
218 | |
219 | NotificationQueue* queue_; |
220 | bool* destroyedFlagPtr_; |
221 | uint32_t maxReadAtOnce_; |
222 | EventBase* base_; |
223 | bool active_{false}; |
224 | }; |
225 | |
226 | class SimpleConsumer { |
227 | public: |
228 | explicit SimpleConsumer(NotificationQueue& queue) : queue_(queue) { |
229 | ++queue_.numConsumers_; |
230 | } |
231 | |
232 | ~SimpleConsumer() { |
233 | --queue_.numConsumers_; |
234 | } |
235 | |
236 | int getFd() const { |
237 | return queue_.eventfd_ >= 0 ? queue_.eventfd_ : queue_.pipeFds_[0]; |
238 | } |
239 | |
240 | template <typename F> |
241 | void consumeUntilDrained(F&& foreach); |
242 | |
243 | private: |
244 | NotificationQueue& queue_; |
245 | }; |
246 | |
247 | enum class FdType { |
248 | PIPE, |
249 | #ifdef FOLLY_HAVE_EVENTFD |
250 | EVENTFD, |
251 | #endif |
252 | }; |
253 | |
254 | /** |
255 | * Create a new NotificationQueue. |
256 | * |
257 | * If the maxSize parameter is specified, this sets the maximum queue size |
258 | * that will be enforced by tryPutMessage(). (This size is advisory, and may |
259 | * be exceeded if producers explicitly use putMessage() instead of |
260 | * tryPutMessage().) |
261 | * |
262 | * The fdType parameter determines the type of file descriptor used |
263 | * internally to signal message availability. The default (eventfd) is |
264 | * preferable for performance and because it won't fail when the queue gets |
265 | * too long. It is not available on on older and non-linux kernels, however. |
266 | * In this case the code will fall back to using a pipe, the parameter is |
267 | * mostly for testing purposes. |
268 | */ |
269 | explicit NotificationQueue( |
270 | uint32_t maxSize = 0, |
271 | #ifdef FOLLY_HAVE_EVENTFD |
272 | FdType fdType = FdType::EVENTFD) |
273 | #else |
274 | FdType fdType = FdType::PIPE) |
275 | #endif |
276 | : eventfd_(-1), |
277 | pipeFds_{-1, -1}, |
278 | advisoryMaxQueueSize_(maxSize), |
279 | pid_(pid_t(getpid())) { |
280 | |
281 | #ifdef FOLLY_HAVE_EVENTFD |
282 | if (fdType == FdType::EVENTFD) { |
283 | eventfd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); |
284 | if (eventfd_ == -1) { |
285 | if (errno == ENOSYS || errno == EINVAL) { |
286 | // eventfd not availalble |
287 | LOG(ERROR) << "failed to create eventfd for NotificationQueue: " |
288 | << errno << ", falling back to pipe mode (is your kernel " |
289 | << "> 2.6.30?)" ; |
290 | fdType = FdType::PIPE; |
291 | } else { |
292 | // some other error |
293 | folly::throwSystemError( |
294 | "Failed to create eventfd for " |
295 | "NotificationQueue" , |
296 | errno); |
297 | } |
298 | } |
299 | } |
300 | #endif |
301 | if (fdType == FdType::PIPE) { |
302 | if (pipe(pipeFds_)) { |
303 | folly::throwSystemError( |
304 | "Failed to create pipe for NotificationQueue" , errno); |
305 | } |
306 | try { |
307 | // put both ends of the pipe into non-blocking mode |
308 | if (fcntl(pipeFds_[0], F_SETFL, O_RDONLY | O_NONBLOCK) != 0) { |
309 | folly::throwSystemError( |
310 | "failed to put NotificationQueue pipe read " |
311 | "endpoint into non-blocking mode" , |
312 | errno); |
313 | } |
314 | if (fcntl(pipeFds_[1], F_SETFL, O_WRONLY | O_NONBLOCK) != 0) { |
315 | folly::throwSystemError( |
316 | "failed to put NotificationQueue pipe write " |
317 | "endpoint into non-blocking mode" , |
318 | errno); |
319 | } |
320 | } catch (...) { |
321 | ::close(pipeFds_[0]); |
322 | ::close(pipeFds_[1]); |
323 | throw; |
324 | } |
325 | } |
326 | } |
327 | |
328 | ~NotificationQueue() { |
329 | std::unique_ptr<Node> data; |
330 | while (!queue_.empty()) { |
331 | data.reset(&queue_.front()); |
332 | queue_.pop_front(); |
333 | } |
334 | if (eventfd_ >= 0) { |
335 | ::close(eventfd_); |
336 | eventfd_ = -1; |
337 | } |
338 | if (pipeFds_[0] >= 0) { |
339 | ::close(pipeFds_[0]); |
340 | pipeFds_[0] = -1; |
341 | } |
342 | if (pipeFds_[1] >= 0) { |
343 | ::close(pipeFds_[1]); |
344 | pipeFds_[1] = -1; |
345 | } |
346 | } |
347 | |
348 | /** |
349 | * Set the advisory maximum queue size. |
350 | * |
351 | * This maximum queue size affects calls to tryPutMessage(). Message |
352 | * producers can still use the putMessage() call to unconditionally put a |
353 | * message on the queue, ignoring the configured maximum queue size. This |
354 | * can cause the queue size to exceed the configured maximum. |
355 | */ |
356 | void setMaxQueueSize(uint32_t max) { |
357 | advisoryMaxQueueSize_ = max; |
358 | } |
359 | |
360 | /** |
361 | * Attempt to put a message on the queue if the queue is not already full. |
362 | * |
363 | * If the queue is full, a std::overflow_error will be thrown. The |
364 | * setMaxQueueSize() function controls the maximum queue size. |
365 | * |
366 | * If the queue is currently draining, an std::runtime_error will be thrown. |
367 | * |
368 | * This method may contend briefly on a spinlock if many threads are |
369 | * concurrently accessing the queue, but for all intents and purposes it will |
370 | * immediately place the message on the queue and return. |
371 | * |
372 | * tryPutMessage() may throw std::bad_alloc if memory allocation fails, and |
373 | * may throw any other exception thrown by the MessageT move/copy |
374 | * constructor. |
375 | */ |
376 | template <typename MessageTT> |
377 | void tryPutMessage(MessageTT&& message) { |
378 | putMessageImpl(std::forward<MessageTT>(message), advisoryMaxQueueSize_); |
379 | } |
380 | |
381 | /** |
382 | * No-throw versions of the above. Instead returns true on success, false on |
383 | * failure. |
384 | * |
385 | * Only std::overflow_error (the common exception case) and std::runtime_error |
386 | * (which indicates that the queue is being drained) are prevented from being |
387 | * thrown. User code must still catch std::bad_alloc errors. |
388 | */ |
389 | template <typename MessageTT> |
390 | bool tryPutMessageNoThrow(MessageTT&& message) { |
391 | return putMessageImpl( |
392 | std::forward<MessageTT>(message), advisoryMaxQueueSize_, false); |
393 | } |
394 | |
395 | /** |
396 | * Unconditionally put a message on the queue. |
397 | * |
398 | * This method is like tryPutMessage(), but ignores the maximum queue size |
399 | * and always puts the message on the queue, even if the maximum queue size |
400 | * would be exceeded. |
401 | * |
402 | * putMessage() may throw |
403 | * - std::bad_alloc if memory allocation fails, and may |
404 | * - std::runtime_error if the queue is currently draining |
405 | * - any other exception thrown by the MessageT move/copy constructor. |
406 | */ |
407 | template <typename MessageTT> |
408 | void putMessage(MessageTT&& message) { |
409 | putMessageImpl(std::forward<MessageTT>(message), 0); |
410 | } |
411 | |
412 | /** |
413 | * Put several messages on the queue. |
414 | */ |
415 | template <typename InputIteratorT> |
416 | void putMessages(InputIteratorT first, InputIteratorT last) { |
417 | typedef typename std::iterator_traits<InputIteratorT>::iterator_category |
418 | IterCategory; |
419 | putMessagesImpl(first, last, IterCategory()); |
420 | } |
421 | |
422 | /** |
423 | * Try to immediately pull a message off of the queue, without blocking. |
424 | * |
425 | * If a message is immediately available, the result parameter will be |
426 | * updated to contain the message contents and true will be returned. |
427 | * |
428 | * If no message is available, false will be returned and result will be left |
429 | * unmodified. |
430 | */ |
431 | bool tryConsume(MessageT& result) { |
432 | SCOPE_EXIT { |
433 | syncSignalAndQueue(); |
434 | }; |
435 | |
436 | checkPid(); |
437 | std::unique_ptr<Node> data; |
438 | |
439 | { |
440 | folly::SpinLockGuard g(spinlock_); |
441 | |
442 | if (UNLIKELY(queue_.empty())) { |
443 | return false; |
444 | } |
445 | |
446 | data.reset(&queue_.front()); |
447 | queue_.pop_front(); |
448 | } |
449 | |
450 | result = std::move(data->msg_); |
451 | RequestContext::setContext(std::move(data->ctx_)); |
452 | |
453 | return true; |
454 | } |
455 | |
456 | size_t size() const { |
457 | folly::SpinLockGuard g(spinlock_); |
458 | return queue_.size(); |
459 | } |
460 | |
461 | /** |
462 | * Check that the NotificationQueue is being used from the correct process. |
463 | * |
464 | * If you create a NotificationQueue in one process, then fork, and try to |
465 | * send messages to the queue from the child process, you're going to have a |
466 | * bad time. Unfortunately users have (accidentally) run into this. |
467 | * |
468 | * Because we use an eventfd/pipe, the child process can actually signal the |
469 | * parent process that an event is ready. However, it can't put anything on |
470 | * the parent's queue, so the parent wakes up and finds an empty queue. This |
471 | * check ensures that we catch the problem in the misbehaving child process |
472 | * code, and crash before signalling the parent process. |
473 | */ |
474 | void checkPid() const { |
475 | CHECK_EQ(pid_, pid_t(getpid())); |
476 | } |
477 | |
478 | private: |
479 | // Forbidden copy constructor and assignment operator |
480 | NotificationQueue(NotificationQueue const&) = delete; |
481 | NotificationQueue& operator=(NotificationQueue const&) = delete; |
482 | |
483 | inline bool checkQueueSize(size_t maxSize, bool throws = true) const { |
484 | DCHECK(0 == spinlock_.try_lock()); |
485 | if (maxSize > 0 && queue_.size() >= maxSize) { |
486 | if (throws) { |
487 | throw std::overflow_error( |
488 | "unable to add message to NotificationQueue: " |
489 | "queue is full" ); |
490 | } |
491 | return false; |
492 | } |
493 | return true; |
494 | } |
495 | |
496 | inline bool checkDraining(bool throws = true) { |
497 | if (UNLIKELY(draining_ && throws)) { |
498 | throw std::runtime_error("queue is draining, cannot add message" ); |
499 | } |
500 | return draining_; |
501 | } |
502 | |
503 | #ifdef __ANDROID__ |
504 | // TODO 10860938 Remove after figuring out crash |
505 | mutable std::atomic<int> eventBytes_{0}; |
506 | mutable std::atomic<int> maxEventBytes_{0}; |
507 | #endif |
508 | |
509 | void ensureSignalLocked() const { |
510 | // semantics: empty fd == empty queue <=> !signal_ |
511 | if (signal_) { |
512 | return; |
513 | } |
514 | |
515 | ssize_t bytes_written = 0; |
516 | size_t bytes_expected = 0; |
517 | |
518 | do { |
519 | if (eventfd_ >= 0) { |
520 | // eventfd(2) dictates that we must write a 64-bit integer |
521 | uint64_t signal = 1; |
522 | bytes_expected = sizeof(signal); |
523 | bytes_written = ::write(eventfd_, &signal, bytes_expected); |
524 | } else { |
525 | uint8_t signal = 1; |
526 | bytes_expected = sizeof(signal); |
527 | bytes_written = ::write(pipeFds_[1], &signal, bytes_expected); |
528 | } |
529 | } while (bytes_written == -1 && errno == EINTR); |
530 | |
531 | #ifdef __ANDROID__ |
532 | if (bytes_written > 0) { |
533 | eventBytes_ += bytes_written; |
534 | maxEventBytes_ = std::max((int)maxEventBytes_, (int)eventBytes_); |
535 | } |
536 | #endif |
537 | |
538 | if (bytes_written == ssize_t(bytes_expected)) { |
539 | signal_ = true; |
540 | } else { |
541 | #ifdef __ANDROID__ |
542 | LOG(ERROR) << "NotificationQueue Write Error=" << errno |
543 | << " bytesInPipe=" << eventBytes_ |
544 | << " maxInPipe=" << maxEventBytes_ << " queue=" << size(); |
545 | #endif |
546 | folly::throwSystemError( |
547 | "failed to signal NotificationQueue after " |
548 | "write" , |
549 | errno); |
550 | } |
551 | } |
552 | |
553 | void drainSignalsLocked() { |
554 | ssize_t bytes_read = 0; |
555 | if (eventfd_ > 0) { |
556 | uint64_t message; |
557 | bytes_read = readNoInt(eventfd_, &message, sizeof(message)); |
558 | CHECK(bytes_read != -1 || errno == EAGAIN); |
559 | } else { |
560 | // There should only be one byte in the pipe. To avoid potential leaks we |
561 | // still drain. |
562 | uint8_t message[32]; |
563 | ssize_t result; |
564 | while ((result = readNoInt(pipeFds_[0], &message, sizeof(message))) != |
565 | -1) { |
566 | bytes_read += result; |
567 | } |
568 | CHECK(result == -1 && errno == EAGAIN); |
569 | LOG_IF(ERROR, bytes_read > 1) |
570 | << "[NotificationQueue] Unexpected state while draining pipe: bytes_read=" |
571 | << bytes_read << " bytes, expected <= 1" ; |
572 | } |
573 | LOG_IF(ERROR, (signal_ && bytes_read == 0) || (!signal_ && bytes_read > 0)) |
574 | << "[NotificationQueue] Unexpected state while draining signals: signal_=" |
575 | << signal_ << " bytes_read=" << bytes_read; |
576 | |
577 | signal_ = false; |
578 | |
579 | #ifdef __ANDROID__ |
580 | if (bytes_read > 0) { |
581 | eventBytes_ -= bytes_read; |
582 | } |
583 | #endif |
584 | } |
585 | |
586 | void ensureSignal() const { |
587 | folly::SpinLockGuard g(spinlock_); |
588 | ensureSignalLocked(); |
589 | } |
590 | |
591 | void syncSignalAndQueue() { |
592 | folly::SpinLockGuard g(spinlock_); |
593 | |
594 | if (queue_.empty()) { |
595 | drainSignalsLocked(); |
596 | } else { |
597 | ensureSignalLocked(); |
598 | } |
599 | } |
600 | |
601 | template <typename MessageTT> |
602 | bool putMessageImpl(MessageTT&& message, size_t maxSize, bool throws = true) { |
603 | checkPid(); |
604 | bool signal = false; |
605 | { |
606 | auto data = std::make_unique<Node>( |
607 | std::forward<MessageTT>(message), RequestContext::saveContext()); |
608 | folly::SpinLockGuard g(spinlock_); |
609 | if (checkDraining(throws) || !checkQueueSize(maxSize, throws)) { |
610 | return false; |
611 | } |
612 | // We only need to signal an event if not all consumers are |
613 | // awake. |
614 | if (numActiveConsumers_ < numConsumers_) { |
615 | signal = true; |
616 | } |
617 | queue_.push_back(*data.release()); |
618 | if (signal) { |
619 | ensureSignalLocked(); |
620 | } |
621 | } |
622 | return true; |
623 | } |
624 | |
625 | template <typename InputIteratorT> |
626 | void putMessagesImpl( |
627 | InputIteratorT first, |
628 | InputIteratorT last, |
629 | std::input_iterator_tag) { |
630 | checkPid(); |
631 | bool signal = false; |
632 | boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> q; |
633 | try { |
634 | while (first != last) { |
635 | auto data = std::make_unique<Node>( |
636 | std::move(*first), RequestContext::saveContext()); |
637 | q.push_back(*data.release()); |
638 | ++first; |
639 | } |
640 | folly::SpinLockGuard g(spinlock_); |
641 | checkDraining(); |
642 | queue_.splice(queue_.end(), q); |
643 | if (numActiveConsumers_ < numConsumers_) { |
644 | signal = true; |
645 | } |
646 | if (signal) { |
647 | ensureSignalLocked(); |
648 | } |
649 | } catch (...) { |
650 | std::unique_ptr<Node> data; |
651 | while (!q.empty()) { |
652 | data.reset(&q.front()); |
653 | q.pop_front(); |
654 | } |
655 | throw; |
656 | } |
657 | } |
658 | |
659 | mutable folly::SpinLock spinlock_; |
660 | mutable bool signal_{false}; |
661 | int eventfd_; |
662 | int pipeFds_[2]; // to fallback to on older/non-linux systems |
663 | uint32_t advisoryMaxQueueSize_; |
664 | pid_t pid_; |
665 | boost::intrusive::slist<Node, boost::intrusive::cache_last<true>> queue_; |
666 | int numConsumers_{0}; |
667 | std::atomic<int> numActiveConsumers_{0}; |
668 | bool draining_{false}; |
669 | }; |
670 | |
671 | template <typename MessageT> |
672 | void NotificationQueue<MessageT>::Consumer::destroy() { |
673 | // If we are in the middle of a call to handlerReady(), destroyedFlagPtr_ |
674 | // will be non-nullptr. Mark the value that it points to, so that |
675 | // handlerReady() will know the callback is destroyed, and that it cannot |
676 | // access any member variables anymore. |
677 | if (destroyedFlagPtr_) { |
678 | *destroyedFlagPtr_ = true; |
679 | } |
680 | stopConsuming(); |
681 | DelayedDestruction::destroy(); |
682 | } |
683 | |
684 | template <typename MessageT> |
685 | void NotificationQueue<MessageT>::Consumer::handlerReady( |
686 | uint16_t /*events*/) noexcept { |
687 | consumeMessages(false); |
688 | } |
689 | |
690 | template <typename MessageT> |
691 | void NotificationQueue<MessageT>::Consumer::consumeMessages( |
692 | bool isDrain, |
693 | size_t* numConsumed) noexcept { |
694 | DestructorGuard dg(this); |
695 | uint32_t numProcessed = 0; |
696 | setActive(true); |
697 | SCOPE_EXIT { |
698 | if (queue_) { |
699 | queue_->syncSignalAndQueue(); |
700 | } |
701 | }; |
702 | SCOPE_EXIT { |
703 | setActive(false, /* shouldLock = */ true); |
704 | }; |
705 | SCOPE_EXIT { |
706 | if (numConsumed != nullptr) { |
707 | *numConsumed = numProcessed; |
708 | } |
709 | }; |
710 | while (true) { |
711 | // Now pop the message off of the queue. |
712 | // |
713 | // We have to manually acquire and release the spinlock here, rather than |
714 | // using SpinLockHolder since the MessageT has to be constructed while |
715 | // holding the spinlock and available after we release it. SpinLockHolder |
716 | // unfortunately doesn't provide a release() method. (We can't construct |
717 | // MessageT first since we have no guarantee that MessageT has a default |
718 | // constructor. |
719 | queue_->spinlock_.lock(); |
720 | bool locked = true; |
721 | |
722 | try { |
723 | if (UNLIKELY(queue_->queue_.empty())) { |
724 | // If there is no message, we've reached the end of the queue, return. |
725 | setActive(false); |
726 | queue_->spinlock_.unlock(); |
727 | return; |
728 | } |
729 | |
730 | // Pull a message off the queue. |
731 | std::unique_ptr<Node> data; |
732 | data.reset(&queue_->queue_.front()); |
733 | queue_->queue_.pop_front(); |
734 | |
735 | // Check to see if the queue is empty now. |
736 | // We use this as an optimization to see if we should bother trying to |
737 | // loop again and read another message after invoking this callback. |
738 | bool wasEmpty = queue_->queue_.empty(); |
739 | if (wasEmpty) { |
740 | setActive(false); |
741 | } |
742 | |
743 | // Now unlock the spinlock before we invoke the callback. |
744 | queue_->spinlock_.unlock(); |
745 | RequestContextScopeGuard rctx(std::move(data->ctx_)); |
746 | |
747 | locked = false; |
748 | |
749 | // Call the callback |
750 | bool callbackDestroyed = false; |
751 | CHECK(destroyedFlagPtr_ == nullptr); |
752 | destroyedFlagPtr_ = &callbackDestroyed; |
753 | messageAvailable(std::move(data->msg_)); |
754 | destroyedFlagPtr_ = nullptr; |
755 | |
756 | // If the callback was destroyed before it returned, we are done |
757 | if (callbackDestroyed) { |
758 | return; |
759 | } |
760 | |
761 | // If the callback is no longer installed, we are done. |
762 | if (queue_ == nullptr) { |
763 | return; |
764 | } |
765 | |
766 | // If we have hit maxReadAtOnce_, we are done. |
767 | ++numProcessed; |
768 | if (!isDrain && maxReadAtOnce_ > 0 && numProcessed >= maxReadAtOnce_) { |
769 | return; |
770 | } |
771 | |
772 | // If the queue was empty before we invoked the callback, it's probable |
773 | // that it is still empty now. Just go ahead and return, rather than |
774 | // looping again and trying to re-read from the eventfd. (If a new |
775 | // message had in fact arrived while we were invoking the callback, we |
776 | // will simply be woken up the next time around the event loop and will |
777 | // process the message then.) |
778 | if (wasEmpty) { |
779 | return; |
780 | } |
781 | } catch (const std::exception&) { |
782 | // This catch block is really just to handle the case where the MessageT |
783 | // constructor throws. The messageAvailable() callback itself is |
784 | // declared as noexcept and should never throw. |
785 | // |
786 | // If the MessageT constructor does throw we try to handle it as best as |
787 | // we can, but we can't work miracles. We will just ignore the error for |
788 | // now and return. The next time around the event loop we will end up |
789 | // trying to read the message again. If MessageT continues to throw we |
790 | // will never make forward progress and will keep trying each time around |
791 | // the event loop. |
792 | if (locked) { |
793 | // Unlock the spinlock. |
794 | queue_->spinlock_.unlock(); |
795 | } |
796 | |
797 | return; |
798 | } |
799 | } |
800 | } |
801 | |
802 | template <typename MessageT> |
803 | void NotificationQueue<MessageT>::Consumer::init( |
804 | EventBase* eventBase, |
805 | NotificationQueue* queue) { |
806 | eventBase->dcheckIsInEventBaseThread(); |
807 | assert(queue_ == nullptr); |
808 | assert(!isHandlerRegistered()); |
809 | queue->checkPid(); |
810 | |
811 | base_ = eventBase; |
812 | |
813 | queue_ = queue; |
814 | |
815 | { |
816 | folly::SpinLockGuard g(queue_->spinlock_); |
817 | queue_->numConsumers_++; |
818 | } |
819 | queue_->ensureSignal(); |
820 | |
821 | if (queue_->eventfd_ >= 0) { |
822 | initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->eventfd_)); |
823 | } else { |
824 | initHandler(eventBase, folly::NetworkSocket::fromFd(queue_->pipeFds_[0])); |
825 | } |
826 | } |
827 | |
828 | template <typename MessageT> |
829 | void NotificationQueue<MessageT>::Consumer::stopConsuming() { |
830 | if (queue_ == nullptr) { |
831 | assert(!isHandlerRegistered()); |
832 | return; |
833 | } |
834 | |
835 | { |
836 | folly::SpinLockGuard g(queue_->spinlock_); |
837 | queue_->numConsumers_--; |
838 | setActive(false); |
839 | } |
840 | |
841 | assert(isHandlerRegistered()); |
842 | unregisterHandler(); |
843 | detachEventBase(); |
844 | queue_ = nullptr; |
845 | } |
846 | |
847 | template <typename MessageT> |
848 | bool NotificationQueue<MessageT>::Consumer::consumeUntilDrained( |
849 | size_t* numConsumed) noexcept { |
850 | DestructorGuard dg(this); |
851 | { |
852 | folly::SpinLockGuard g(queue_->spinlock_); |
853 | if (queue_->draining_) { |
854 | return false; |
855 | } |
856 | queue_->draining_ = true; |
857 | } |
858 | consumeMessages(true, numConsumed); |
859 | { |
860 | folly::SpinLockGuard g(queue_->spinlock_); |
861 | queue_->draining_ = false; |
862 | } |
863 | return true; |
864 | } |
865 | |
866 | template <typename MessageT> |
867 | template <typename F> |
868 | void NotificationQueue<MessageT>::SimpleConsumer::consumeUntilDrained( |
869 | F&& foreach) { |
870 | SCOPE_EXIT { |
871 | queue_.syncSignalAndQueue(); |
872 | }; |
873 | |
874 | queue_.checkPid(); |
875 | |
876 | while (true) { |
877 | std::unique_ptr<Node> data; |
878 | { |
879 | folly::SpinLockGuard g(queue_.spinlock_); |
880 | |
881 | if (UNLIKELY(queue_.queue_.empty())) { |
882 | return; |
883 | } |
884 | |
885 | data.reset(&queue_.queue_.front()); |
886 | queue_.queue_.pop_front(); |
887 | } |
888 | |
889 | RequestContextScopeGuard rctx(std::move(data->ctx_)); |
890 | foreach(std::move(data->msg_)); |
891 | // Make sure message destructor is called with the correct RequestContext. |
892 | data.reset(); |
893 | } |
894 | } |
895 | |
896 | /** |
897 | * Creates a NotificationQueue::Consumer wrapping a function object |
898 | * Modeled after AsyncTimeout::make |
899 | * |
900 | */ |
901 | |
902 | namespace detail { |
903 | |
904 | template <typename MessageT, typename TCallback> |
905 | struct notification_queue_consumer_wrapper |
906 | : public NotificationQueue<MessageT>::Consumer { |
907 | template <typename UCallback> |
908 | explicit notification_queue_consumer_wrapper(UCallback&& callback) |
909 | : callback_(std::forward<UCallback>(callback)) {} |
910 | |
911 | // we are being stricter here and requiring noexcept for callback |
912 | void messageAvailable(MessageT&& message) noexcept override { |
913 | static_assert( |
914 | noexcept(std::declval<TCallback>()(std::forward<MessageT>(message))), |
915 | "callback must be declared noexcept, e.g.: `[]() noexcept {}`" ); |
916 | |
917 | callback_(std::forward<MessageT>(message)); |
918 | } |
919 | |
920 | private: |
921 | TCallback callback_; |
922 | }; |
923 | |
924 | } // namespace detail |
925 | |
926 | template <typename MessageT> |
927 | template <typename TCallback> |
928 | std::unique_ptr< |
929 | typename NotificationQueue<MessageT>::Consumer, |
930 | DelayedDestruction::Destructor> |
931 | NotificationQueue<MessageT>::Consumer::make(TCallback&& callback) { |
932 | return std::unique_ptr< |
933 | NotificationQueue<MessageT>::Consumer, |
934 | DelayedDestruction::Destructor>( |
935 | new detail::notification_queue_consumer_wrapper< |
936 | MessageT, |
937 | typename std::decay<TCallback>::type>( |
938 | std::forward<TCallback>(callback))); |
939 | } |
940 | |
941 | } // namespace folly |
942 | |