1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#pragma once
17
18#include <atomic>
19#include <cerrno>
20#include <cmath>
21#include <cstdlib>
22#include <functional>
23#include <list>
24#include <memory>
25#include <mutex>
26#include <queue>
27#include <set>
28#include <stack>
29#include <unordered_map>
30#include <unordered_set>
31#include <utility>
32
33#include <boost/intrusive/list.hpp>
34#include <glog/logging.h>
35
36#include <folly/Executor.h>
37#include <folly/Function.h>
38#include <folly/Portability.h>
39#include <folly/ScopeGuard.h>
40#include <folly/executors/DrivableExecutor.h>
41#include <folly/executors/IOExecutor.h>
42#include <folly/executors/ScheduledExecutor.h>
43#include <folly/executors/SequencedExecutor.h>
44#include <folly/experimental/ExecutionObserver.h>
45#include <folly/io/async/AsyncTimeout.h>
46#include <folly/io/async/HHWheelTimer.h>
47#include <folly/io/async/Request.h>
48#include <folly/io/async/TimeoutManager.h>
49#include <folly/portability/Event.h>
50#include <folly/synchronization/CallOnce.h>
51
52namespace folly {
53
54using Cob = Func; // defined in folly/Executor.h
55template <typename MessageT>
56class NotificationQueue;
57
58namespace detail {
59class EventBaseLocalBase;
60
61class EventBaseLocalBaseBase {
62 public:
63 virtual void onEventBaseDestruction(EventBase& evb) = 0;
64 virtual ~EventBaseLocalBaseBase() = default;
65};
66} // namespace detail
67template <typename T>
68class EventBaseLocal;
69
70class EventBaseObserver {
71 public:
72 virtual ~EventBaseObserver() = default;
73
74 virtual uint32_t getSampleRate() const = 0;
75
76 virtual void loopSample(int64_t busyTime, int64_t idleTime) = 0;
77};
78
79// Helper class that sets and retrieves the EventBase associated with a given
80// request via RequestContext. See Request.h for that mechanism.
81class RequestEventBase : public RequestData {
82 public:
83 static EventBase* get() {
84 auto data = dynamic_cast<RequestEventBase*>(
85 RequestContext::get()->getContextData(kContextDataName));
86 if (!data) {
87 return nullptr;
88 }
89 return data->eb_;
90 }
91
92 static void set(EventBase* eb) {
93 RequestContext::get()->setContextData(
94 kContextDataName,
95 std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
96 }
97
98 bool hasCallback() override {
99 return false;
100 }
101
102 private:
103 explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
104 EventBase* eb_;
105 static constexpr const char* kContextDataName{"EventBase"};
106};
107
108class VirtualEventBase;
109
110/**
111 * This class is a wrapper for all asynchronous I/O processing functionality
112 *
113 * EventBase provides a main loop that notifies EventHandler callback objects
114 * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
115 * when a specified timeout has expired. More complex, higher-level callback
116 * mechanisms can then be built on top of EventHandler and AsyncTimeout.
117 *
118 * A EventBase object can only drive an event loop for a single thread. To
119 * take advantage of multiple CPU cores, most asynchronous I/O servers have one
120 * thread per CPU, and use a separate EventBase for each thread.
121 *
122 * In general, most EventBase methods may only be called from the thread
123 * running the EventBase's loop. There are a few exceptions to this rule, for
124 * methods that are explicitly intended to allow communication with a
125 * EventBase from other threads. When it is safe to call a method from
126 * another thread it is explicitly listed in the method comments.
127 */
128class EventBase : private boost::noncopyable,
129 public TimeoutManager,
130 public DrivableExecutor,
131 public IOExecutor,
132 public SequencedExecutor,
133 public ScheduledExecutor {
134 public:
135 using Func = folly::Function<void()>;
136
137 /**
138 * A callback interface to use with runInLoop()
139 *
140 * Derive from this class if you need to delay some code execution until the
141 * next iteration of the event loop. This allows you to schedule code to be
142 * invoked from the top-level of the loop, after your immediate callers have
143 * returned.
144 *
145 * If a LoopCallback object is destroyed while it is scheduled to be run in
146 * the next loop iteration, it will automatically be cancelled.
147 */
148 class LoopCallback
149 : public boost::intrusive::list_base_hook<
150 boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
151 public:
152 virtual ~LoopCallback() = default;
153
154 virtual void runLoopCallback() noexcept = 0;
155 void cancelLoopCallback() {
156 context_.reset();
157 unlink();
158 }
159
160 bool isLoopCallbackScheduled() const {
161 return is_linked();
162 }
163
164 private:
165 typedef boost::intrusive::
166 list<LoopCallback, boost::intrusive::constant_time_size<false>>
167 List;
168
169 // EventBase needs access to LoopCallbackList (and therefore to hook_)
170 friend class EventBase;
171 friend class VirtualEventBase;
172 std::shared_ptr<RequestContext> context_;
173 };
174
175 class FunctionLoopCallback : public LoopCallback {
176 public:
177 explicit FunctionLoopCallback(Func&& function)
178 : function_(std::move(function)) {}
179
180 void runLoopCallback() noexcept override {
181 function_();
182 delete this;
183 }
184
185 private:
186 Func function_;
187 };
188
189 // Like FunctionLoopCallback, but saves one allocation. Use with caution.
190 //
191 // The caller is responsible for maintaining the lifetime of this callback
192 // until after the point at which the contained function is called.
193 class StackFunctionLoopCallback : public LoopCallback {
194 public:
195 explicit StackFunctionLoopCallback(Func&& function)
196 : function_(std::move(function)) {}
197 void runLoopCallback() noexcept override {
198 Func(std::move(function_))();
199 }
200
201 private:
202 Func function_;
203 };
204
205 /**
206 * Create a new EventBase object.
207 *
208 * Same as EventBase(true), which constructs an EventBase that measures time.
209 */
210 EventBase() : EventBase(true) {}
211
212 /**
213 * Create a new EventBase object.
214 *
215 * @param enableTimeMeasurement Informs whether this event base should measure
216 * time. Disabling it would likely improve
217 * performance, but will disable some features
218 * that relies on time-measurement, including:
219 * observer, max latency and avg loop time.
220 */
221 explicit EventBase(bool enableTimeMeasurement);
222
223 /**
224 * Create a new EventBase object that will use the specified libevent
225 * event_base object to drive the event loop.
226 *
227 * The EventBase will take ownership of this event_base, and will call
228 * event_base_free(evb) when the EventBase is destroyed.
229 *
230 * @param enableTimeMeasurement Informs whether this event base should measure
231 * time. Disabling it would likely improve
232 * performance, but will disable some features
233 * that relies on time-measurement, including:
234 * observer, max latency and avg loop time.
235 */
236 explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
237 ~EventBase() override;
238
239 /**
240 * Runs the event loop.
241 *
242 * loop() will loop waiting for I/O or timeouts and invoking EventHandler
243 * and AsyncTimeout callbacks as their events become ready. loop() will
244 * only return when there are no more events remaining to process, or after
245 * terminateLoopSoon() has been called.
246 *
247 * loop() may be called again to restart event processing after a previous
248 * call to loop() or loopForever() has returned.
249 *
250 * Returns true if the loop completed normally (if it processed all
251 * outstanding requests, or if terminateLoopSoon() was called). If an error
252 * occurs waiting for events, false will be returned.
253 */
254 bool loop();
255
256 /**
257 * Same as loop(), but doesn't wait for all keep-alive tokens to be released.
258 */
259 [[deprecated("This should only be used in legacy unit tests")]] bool
260 loopIgnoreKeepAlive();
261
262 /**
263 * Wait for some events to become active, run them, then return.
264 *
265 * When EVLOOP_NONBLOCK is set in flags, the loop won't block if there
266 * are not any events to process.
267 *
268 * This is useful for callers that want to run the loop manually.
269 *
270 * Returns the same result as loop().
271 */
272 bool loopOnce(int flags = 0);
273
274 /**
275 * Runs the event loop.
276 *
277 * loopForever() behaves like loop(), except that it keeps running even if
278 * when there are no more user-supplied EventHandlers or AsyncTimeouts
279 * registered. It will only return after terminateLoopSoon() has been
280 * called.
281 *
282 * This is useful for callers that want to wait for other threads to call
283 * runInEventBaseThread(), even when there are no other scheduled events.
284 *
285 * loopForever() may be called again to restart event processing after a
286 * previous call to loop() or loopForever() has returned.
287 *
288 * Throws a std::system_error if an error occurs.
289 */
290 void loopForever();
291
292 /**
293 * Causes the event loop to exit soon.
294 *
295 * This will cause an existing call to loop() or loopForever() to stop event
296 * processing and return, even if there are still events remaining to be
297 * processed.
298 *
299 * It is safe to call terminateLoopSoon() from another thread to cause loop()
300 * to wake up and return in the EventBase loop thread. terminateLoopSoon()
301 * may also be called from the loop thread itself (for example, a
302 * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
303 * cause the loop to exit after the callback returns.) If the loop is not
304 * running, this will cause the next call to loop to terminate soon after
305 * starting. If a loop runs out of work (and so terminates on its own)
306 * concurrently with a call to terminateLoopSoon(), this may cause a race
307 * condition.
308 *
309 * Note that the caller is responsible for ensuring that cleanup of all event
310 * callbacks occurs properly. Since terminateLoopSoon() causes the loop to
311 * exit even when there are pending events present, there may be remaining
312 * callbacks present waiting to be invoked. If the loop is later restarted
313 * pending events will continue to be processed normally, however if the
314 * EventBase is destroyed after calling terminateLoopSoon() it is the
315 * caller's responsibility to ensure that cleanup happens properly even if
316 * some outstanding events are never processed.
317 */
318 void terminateLoopSoon();
319
320 /**
321 * Adds the given callback to a queue of things run after the current pass
322 * through the event loop completes. Note that if this callback calls
323 * runInLoop() the new callback won't be called until the main event loop
324 * has gone through a cycle.
325 *
326 * This method may only be called from the EventBase's thread. This
327 * essentially allows an event handler to schedule an additional callback to
328 * be invoked after it returns.
329 *
330 * Use runInEventBaseThread() to schedule functions from another thread.
331 *
332 * The thisIteration parameter makes this callback run in this loop
333 * iteration, instead of the next one, even if called from a
334 * runInLoop callback (normal io callbacks that call runInLoop will
335 * always run in this iteration). This was originally added to
336 * support detachEventBase, as a user callback may have called
337 * terminateLoopSoon(), but we want to make sure we detach. Also,
338 * detachEventBase almost always must be called from the base event
339 * loop to ensure the stack is unwound, since most users of
340 * EventBase are not thread safe.
341 *
342 * Ideally we would not need thisIteration, and instead just use
343 * runInLoop with loop() (instead of terminateLoopSoon).
344 */
345 void runInLoop(LoopCallback* callback, bool thisIteration = false);
346
347 /**
348 * Convenience function to call runInLoop() with a folly::Function.
349 *
350 * This creates a LoopCallback object to wrap the folly::Function, and invoke
351 * the folly::Function when the loop callback fires. This is slightly more
352 * expensive than defining your own LoopCallback, but more convenient in
353 * areas that aren't too performance sensitive.
354 *
355 * This method may only be called from the EventBase's thread. This
356 * essentially allows an event handler to schedule an additional callback to
357 * be invoked after it returns.
358 *
359 * Use runInEventBaseThread() to schedule functions from another thread.
360 */
361 void runInLoop(Func c, bool thisIteration = false);
362
363 /**
364 * Adds the given callback to a queue of things run before destruction
365 * of current EventBase.
366 *
367 * This allows users of EventBase that run in it, but don't control it,
368 * to be notified before EventBase gets destructed.
369 *
370 * Note: will be called from the thread that invoked EventBase destructor,
371 * before the final run of loop callbacks.
372 */
373 void runOnDestruction(LoopCallback* callback);
374
375 /**
376 * Adds a callback that will run immediately *before* the event loop.
377 * This is very similar to runInLoop(), but will not cause the loop to break:
378 * For example, this callback could be used to get loop times.
379 */
380 void runBeforeLoop(LoopCallback* callback);
381
382 /**
383 * Run the specified function in the EventBase's thread.
384 *
385 * This method is thread-safe, and may be called from another thread.
386 *
387 * If runInEventBaseThread() is called when the EventBase loop is not
388 * running, the function call will be delayed until the next time the loop is
389 * started.
390 *
391 * If runInEventBaseThread() returns true the function has successfully been
392 * scheduled to run in the loop thread. However, if the loop is terminated
393 * (and never later restarted) before it has a chance to run the requested
394 * function, the function will be run upon the EventBase's destruction.
395 *
396 * If two calls to runInEventBaseThread() are made from the same thread, the
397 * functions will always be run in the order that they were scheduled.
398 * Ordering between functions scheduled from separate threads is not
399 * guaranteed.
400 *
401 * @param fn The function to run. The function must not throw any
402 * exceptions.
403 * @param arg An argument to pass to the function.
404 *
405 * @return Returns true if the function was successfully scheduled, or false
406 * if there was an error scheduling the function.
407 */
408 template <typename T>
409 bool runInEventBaseThread(void (*fn)(T*), T* arg);
410
411 /**
412 * Run the specified function in the EventBase's thread
413 *
414 * This version of runInEventBaseThread() takes a folly::Function object.
415 * Note that this may be less efficient than the version that takes a plain
416 * function pointer and void* argument, if moving the function is expensive
417 * (e.g., if it wraps a lambda which captures some values with expensive move
418 * constructors).
419 *
420 * If the loop is terminated (and never later restarted) before it has a
421 * chance to run the requested function, the function will be run upon the
422 * EventBase's destruction.
423 *
424 * The function must not throw any exceptions.
425 */
426 bool runInEventBaseThread(Func fn);
427
428 /**
429 * Run the specified function in the EventBase's thread.
430 *
431 * This method is thread-safe, and may be called from another thread.
432 *
433 * If runInEventBaseThreadAlwaysEnqueue() is called when the EventBase loop is
434 * not running, the function call will be delayed until the next time the loop
435 * is started.
436 *
437 * If runInEventBaseThreadAlwaysEnqueue() returns true the function has
438 * successfully been scheduled to run in the loop thread. However, if the
439 * loop is terminated (and never later restarted) before it has a chance to
440 * run the requested function, the function will be run upon the EventBase's
441 * destruction.
442 *
443 * If two calls to runInEventBaseThreadAlwaysEnqueue() are made from the same
444 * thread, the functions will always be run in the order that they were
445 * scheduled. Ordering between functions scheduled from separate threads is
446 * not guaranteed. If a call is made from the EventBase thread, the function
447 * will not be executed inline and will be queued to the same queue as if the
448 * call would have been made from a different thread
449 *
450 * @param fn The function to run. The function must not throw any
451 * exceptions.
452 * @param arg An argument to pass to the function.
453 *
454 * @return Returns true if the function was successfully scheduled, or false
455 * if there was an error scheduling the function.
456 */
457 template <typename T>
458 bool runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg);
459
460 /**
461 * Run the specified function in the EventBase's thread
462 *
463 * This version of runInEventBaseThreadAlwaysEnqueue() takes a folly::Function
464 * object. Note that this may be less efficient than the version that takes a
465 * plain function pointer and void* argument, if moving the function is
466 * expensive (e.g., if it wraps a lambda which captures some values with
467 * expensive move constructors).
468 *
469 * If the loop is terminated (and never later restarted) before it has a
470 * chance to run the requested function, the function will be run upon the
471 * EventBase's destruction.
472 *
473 * The function must not throw any exceptions.
474 */
475 bool runInEventBaseThreadAlwaysEnqueue(Func fn);
476
477 /*
478 * Like runInEventBaseThread, but the caller waits for the callback to be
479 * executed.
480 */
481 template <typename T>
482 bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
483
484 /*
485 * Like runInEventBaseThread, but the caller waits for the callback to be
486 * executed.
487 */
488 bool runInEventBaseThreadAndWait(Func fn);
489
490 /*
491 * Like runInEventBaseThreadAndWait, except if the caller is already in the
492 * event base thread, the functor is simply run inline.
493 */
494 template <typename T>
495 bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
496
497 /*
498 * Like runInEventBaseThreadAndWait, except if the caller is already in the
499 * event base thread, the functor is simply run inline.
500 */
501 bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
502
503 /**
504 * Set the maximum desired latency in us and provide a callback which will be
505 * called when that latency is exceeded.
506 * OBS: This functionality depends on time-measurement.
507 */
508 void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) {
509 assert(enableTimeMeasurement_);
510 maxLatency_ = maxLatency;
511 maxLatencyCob_ = std::move(maxLatencyCob);
512 }
513
514 /**
515 * Set smoothing coefficient for loop load average; # of milliseconds
516 * for exp(-1) (1/2.71828...) decay.
517 */
518 void setLoadAvgMsec(std::chrono::milliseconds ms);
519
520 /**
521 * reset the load average to a desired value
522 */
523 void resetLoadAvg(double value = 0.0);
524
525 /**
526 * Get the average loop time in microseconds (an exponentially-smoothed ave)
527 */
528 double getAvgLoopTime() const {
529 assert(enableTimeMeasurement_);
530 return avgLoopTime_.get();
531 }
532
533 /**
534 * check if the event base loop is running.
535 */
536 bool isRunning() const {
537 return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
538 }
539
540 /**
541 * wait until the event loop starts (after starting the event loop thread).
542 */
543 void waitUntilRunning();
544
545 size_t getNotificationQueueSize() const;
546
547 void setMaxReadAtOnce(uint32_t maxAtOnce);
548
549 /**
550 * Verify that current thread is the EventBase thread, if the EventBase is
551 * running.
552 */
553 bool isInEventBaseThread() const {
554 auto tid = loopThread_.load(std::memory_order_relaxed);
555 return tid == std::thread::id() || tid == std::this_thread::get_id();
556 }
557
558 bool inRunningEventBaseThread() const {
559 return loopThread_.load(std::memory_order_relaxed) ==
560 std::this_thread::get_id();
561 }
562
563 /**
564 * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
565 * dcheckIsInEventBaseThread), but it prints more information on
566 * failure.
567 */
568 void checkIsInEventBaseThread() const;
569 void dcheckIsInEventBaseThread() const {
570 if (kIsDebug) {
571 checkIsInEventBaseThread();
572 }
573 }
574
575 HHWheelTimer& timer() {
576 if (!wheelTimer_) {
577 wheelTimer_ = HHWheelTimer::newTimer(this);
578 }
579 return *wheelTimer_.get();
580 }
581
582 // --------- interface to underlying libevent base ------------
583 // Avoid using these functions if possible. These functions are not
584 // guaranteed to always be present if we ever provide alternative EventBase
585 // implementations that do not use libevent internally.
586 event_base* getLibeventBase() const {
587 return evb_;
588 }
589 static const char* getLibeventVersion();
590 static const char* getLibeventMethod();
591
592 /**
593 * only EventHandler/AsyncTimeout subclasses and ourselves should
594 * ever call this.
595 *
596 * This is used to mark the beginning of a new loop cycle by the
597 * first handler fired within that cycle.
598 *
599 */
600 void bumpHandlingTime() final;
601
602 class SmoothLoopTime {
603 public:
604 explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
605 : expCoeff_(-1.0 / timeInterval.count()), value_(0.0) {
606 VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
607 }
608
609 void setTimeInterval(std::chrono::microseconds timeInterval);
610 void reset(double value = 0.0);
611
612 void addSample(
613 std::chrono::microseconds total,
614 std::chrono::microseconds busy);
615
616 double get() const {
617 // Add the outstanding buffered times linearly, to avoid
618 // expensive exponentiation
619 auto lcoeff = buffer_time_.count() * -expCoeff_;
620 return value_ * (1.0 - lcoeff) + lcoeff * busy_buffer_.count();
621 }
622
623 void dampen(double factor) {
624 value_ *= factor;
625 }
626
627 private:
628 double expCoeff_;
629 double value_;
630 std::chrono::microseconds buffer_time_{0};
631 std::chrono::microseconds busy_buffer_{0};
632 std::size_t buffer_cnt_{0};
633 static constexpr std::chrono::milliseconds buffer_interval_{10};
634 };
635
636 void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
637 assert(enableTimeMeasurement_);
638 observer_ = observer;
639 }
640
641 const std::shared_ptr<EventBaseObserver>& getObserver() {
642 return observer_;
643 }
644
645 /**
646 * Setup execution observation/instrumentation for every EventHandler
647 * executed in this EventBase.
648 *
649 * @param executionObserver EventHandle's execution observer.
650 */
651 void setExecutionObserver(ExecutionObserver* observer) {
652 executionObserver_ = observer;
653 }
654
655 /**
656 * Gets the execution observer associated with this EventBase.
657 */
658 ExecutionObserver* getExecutionObserver() {
659 return executionObserver_;
660 }
661
662 /**
663 * Set the name of the thread that runs this event base.
664 */
665 void setName(const std::string& name);
666
667 /**
668 * Returns the name of the thread that runs this event base.
669 */
670 const std::string& getName();
671
672 /// Implements the Executor interface
673 void add(Cob fn) override {
674 // runInEventBaseThread() takes a const&,
675 // so no point in doing std::move here.
676 runInEventBaseThread(std::move(fn));
677 }
678
679 /// Implements the DrivableExecutor interface
680 void drive() override {
681 ++loopKeepAliveCount_;
682 SCOPE_EXIT {
683 --loopKeepAliveCount_;
684 };
685 loopOnce();
686 }
687
688 // Implements the ScheduledExecutor interface
689 void scheduleAt(Func&& fn, TimePoint const& timeout) override;
690
691 // TimeoutManager
692 void attachTimeoutManager(
693 AsyncTimeout* obj,
694 TimeoutManager::InternalEnum internal) final;
695
696 void detachTimeoutManager(AsyncTimeout* obj) final;
697
698 bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
699 final;
700
701 void cancelTimeout(AsyncTimeout* obj) final;
702
703 bool isInTimeoutManagerThread() final {
704 return isInEventBaseThread();
705 }
706
707 // Returns a VirtualEventBase attached to this EventBase. Can be used to
708 // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
709 // destroyed together with the EventBase.
710 //
711 // Any number of VirtualEventBases instances may be independently constructed,
712 // which are backed by this EventBase. This method should be only used if you
713 // don't need to manage the life time of the VirtualEventBase used.
714 folly::VirtualEventBase& getVirtualEventBase();
715
716 /// Implements the IOExecutor interface
717 EventBase* getEventBase() override;
718
719 protected:
720 bool keepAliveAcquire() override {
721 if (inRunningEventBaseThread()) {
722 loopKeepAliveCount_++;
723 } else {
724 loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
725 }
726 return true;
727 }
728
729 void keepAliveRelease() override {
730 if (!inRunningEventBaseThread()) {
731 return add([this] { loopKeepAliveCount_--; });
732 }
733 loopKeepAliveCount_--;
734 }
735
736 private:
737 void applyLoopKeepAlive();
738
739 ssize_t loopKeepAliveCount();
740
741 /*
742 * Helper function that tells us whether we have already handled
743 * some event/timeout/callback in this loop iteration.
744 */
745 bool nothingHandledYet() const noexcept;
746
747 typedef LoopCallback::List LoopCallbackList;
748 class FunctionRunner;
749
750 bool loopBody(int flags = 0, bool ignoreKeepAlive = false);
751
752 // executes any callbacks queued by runInLoop(); returns false if none found
753 bool runLoopCallbacks();
754
755 void initNotificationQueue();
756
757 // should only be accessed through public getter
758 HHWheelTimer::UniquePtr wheelTimer_;
759
760 LoopCallbackList loopCallbacks_;
761 LoopCallbackList runBeforeLoopCallbacks_;
762 LoopCallbackList onDestructionCallbacks_;
763
764 // This will be null most of the time, but point to currentCallbacks
765 // if we are in the middle of running loop callbacks, such that
766 // runInLoop(..., true) will always run in the current loop
767 // iteration.
768 LoopCallbackList* runOnceCallbacks_;
769
770 // stop_ is set by terminateLoopSoon() and is used by the main loop
771 // to determine if it should exit
772 std::atomic<bool> stop_;
773
774 // The ID of the thread running the main loop.
775 // std::thread::id{} if loop is not running.
776 std::atomic<std::thread::id> loopThread_;
777
778 // pointer to underlying event_base class doing the heavy lifting
779 event_base* evb_;
780
781 // A notification queue for runInEventBaseThread() to use
782 // to send function requests to the EventBase thread.
783 std::unique_ptr<NotificationQueue<Func>> queue_;
784 std::unique_ptr<FunctionRunner> fnRunner_;
785 ssize_t loopKeepAliveCount_{0};
786 std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
787 bool loopKeepAliveActive_{false};
788
789 // limit for latency in microseconds (0 disables)
790 std::chrono::microseconds maxLatency_;
791
792 // exponentially-smoothed average loop time for latency-limiting
793 SmoothLoopTime avgLoopTime_;
794
795 // smoothed loop time used to invoke latency callbacks; differs from
796 // avgLoopTime_ in that it's scaled down after triggering a callback
797 // to reduce spamminess
798 SmoothLoopTime maxLatencyLoopTime_;
799
800 // callback called when latency limit is exceeded
801 Func maxLatencyCob_;
802
803 // Enables/disables time measurements in loopBody(). if disabled, the
804 // following functionality that relies on time-measurement, will not
805 // be supported: avg loop time, observer and max latency.
806 const bool enableTimeMeasurement_;
807
808 // Wrap-around loop counter to detect beginning of each loop
809 std::size_t nextLoopCnt_;
810 std::size_t latestLoopCnt_;
811 std::chrono::steady_clock::time_point startWork_;
812 // Prevent undefined behavior from invoking event_base_loop() reentrantly.
813 // This is needed since many projects use libevent-1.4, which lacks commit
814 // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
815 // https://github.com/libevent/libevent/.
816 bool invokingLoop_{false};
817
818 // Observer to export counters
819 std::shared_ptr<EventBaseObserver> observer_;
820 uint32_t observerSampleCount_;
821
822 // EventHandler's execution observer.
823 ExecutionObserver* executionObserver_;
824
825 // Name of the thread running this EventBase
826 std::string name_;
827
828 // allow runOnDestruction() to be called from any threads
829 std::mutex onDestructionCallbacksMutex_;
830
831 // see EventBaseLocal
832 friend class detail::EventBaseLocalBase;
833 template <typename T>
834 friend class EventBaseLocal;
835 std::unordered_map<std::size_t, std::shared_ptr<void>> localStorage_;
836 std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
837
838 folly::once_flag virtualEventBaseInitFlag_;
839 std::unique_ptr<VirtualEventBase> virtualEventBase_;
840};
841
842template <typename T>
843bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
844 return runInEventBaseThread([=] { fn(arg); });
845}
846
847template <typename T>
848bool EventBase::runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) {
849 return runInEventBaseThreadAlwaysEnqueue([=] { fn(arg); });
850}
851
852template <typename T>
853bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
854 return runInEventBaseThreadAndWait([=] { fn(arg); });
855}
856
857template <typename T>
858bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
859 void (*fn)(T*),
860 T* arg) {
861 return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
862}
863
864} // namespace folly
865