1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | #pragma once |
17 | |
18 | #include <atomic> |
19 | #include <cerrno> |
20 | #include <cmath> |
21 | #include <cstdlib> |
22 | #include <functional> |
23 | #include <list> |
24 | #include <memory> |
25 | #include <mutex> |
26 | #include <queue> |
27 | #include <set> |
28 | #include <stack> |
29 | #include <unordered_map> |
30 | #include <unordered_set> |
31 | #include <utility> |
32 | |
33 | #include <boost/intrusive/list.hpp> |
34 | #include <glog/logging.h> |
35 | |
36 | #include <folly/Executor.h> |
37 | #include <folly/Function.h> |
38 | #include <folly/Portability.h> |
39 | #include <folly/ScopeGuard.h> |
40 | #include <folly/executors/DrivableExecutor.h> |
41 | #include <folly/executors/IOExecutor.h> |
42 | #include <folly/executors/ScheduledExecutor.h> |
43 | #include <folly/executors/SequencedExecutor.h> |
44 | #include <folly/experimental/ExecutionObserver.h> |
45 | #include <folly/io/async/AsyncTimeout.h> |
46 | #include <folly/io/async/HHWheelTimer.h> |
47 | #include <folly/io/async/Request.h> |
48 | #include <folly/io/async/TimeoutManager.h> |
49 | #include <folly/portability/Event.h> |
50 | #include <folly/synchronization/CallOnce.h> |
51 | |
52 | namespace folly { |
53 | |
54 | using Cob = Func; // defined in folly/Executor.h |
55 | template <typename MessageT> |
56 | class NotificationQueue; |
57 | |
58 | namespace detail { |
59 | class EventBaseLocalBase; |
60 | |
61 | class EventBaseLocalBaseBase { |
62 | public: |
63 | virtual void onEventBaseDestruction(EventBase& evb) = 0; |
64 | virtual ~EventBaseLocalBaseBase() = default; |
65 | }; |
66 | } // namespace detail |
67 | template <typename T> |
68 | class EventBaseLocal; |
69 | |
70 | class EventBaseObserver { |
71 | public: |
72 | virtual ~EventBaseObserver() = default; |
73 | |
74 | virtual uint32_t getSampleRate() const = 0; |
75 | |
76 | virtual void loopSample(int64_t busyTime, int64_t idleTime) = 0; |
77 | }; |
78 | |
79 | // Helper class that sets and retrieves the EventBase associated with a given |
80 | // request via RequestContext. See Request.h for that mechanism. |
81 | class RequestEventBase : public RequestData { |
82 | public: |
83 | static EventBase* get() { |
84 | auto data = dynamic_cast<RequestEventBase*>( |
85 | RequestContext::get()->getContextData(kContextDataName)); |
86 | if (!data) { |
87 | return nullptr; |
88 | } |
89 | return data->eb_; |
90 | } |
91 | |
92 | static void set(EventBase* eb) { |
93 | RequestContext::get()->setContextData( |
94 | kContextDataName, |
95 | std::unique_ptr<RequestEventBase>(new RequestEventBase(eb))); |
96 | } |
97 | |
98 | bool hasCallback() override { |
99 | return false; |
100 | } |
101 | |
102 | private: |
103 | explicit RequestEventBase(EventBase* eb) : eb_(eb) {} |
104 | EventBase* eb_; |
105 | static constexpr const char* kContextDataName{"EventBase" }; |
106 | }; |
107 | |
108 | class VirtualEventBase; |
109 | |
110 | /** |
111 | * This class is a wrapper for all asynchronous I/O processing functionality |
112 | * |
113 | * EventBase provides a main loop that notifies EventHandler callback objects |
114 | * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects |
115 | * when a specified timeout has expired. More complex, higher-level callback |
116 | * mechanisms can then be built on top of EventHandler and AsyncTimeout. |
117 | * |
118 | * A EventBase object can only drive an event loop for a single thread. To |
119 | * take advantage of multiple CPU cores, most asynchronous I/O servers have one |
120 | * thread per CPU, and use a separate EventBase for each thread. |
121 | * |
122 | * In general, most EventBase methods may only be called from the thread |
123 | * running the EventBase's loop. There are a few exceptions to this rule, for |
124 | * methods that are explicitly intended to allow communication with a |
125 | * EventBase from other threads. When it is safe to call a method from |
126 | * another thread it is explicitly listed in the method comments. |
127 | */ |
128 | class EventBase : private boost::noncopyable, |
129 | public TimeoutManager, |
130 | public DrivableExecutor, |
131 | public IOExecutor, |
132 | public SequencedExecutor, |
133 | public ScheduledExecutor { |
134 | public: |
135 | using Func = folly::Function<void()>; |
136 | |
137 | /** |
138 | * A callback interface to use with runInLoop() |
139 | * |
140 | * Derive from this class if you need to delay some code execution until the |
141 | * next iteration of the event loop. This allows you to schedule code to be |
142 | * invoked from the top-level of the loop, after your immediate callers have |
143 | * returned. |
144 | * |
145 | * If a LoopCallback object is destroyed while it is scheduled to be run in |
146 | * the next loop iteration, it will automatically be cancelled. |
147 | */ |
148 | class LoopCallback |
149 | : public boost::intrusive::list_base_hook< |
150 | boost::intrusive::link_mode<boost::intrusive::auto_unlink>> { |
151 | public: |
152 | virtual ~LoopCallback() = default; |
153 | |
154 | virtual void runLoopCallback() noexcept = 0; |
155 | void cancelLoopCallback() { |
156 | context_.reset(); |
157 | unlink(); |
158 | } |
159 | |
160 | bool isLoopCallbackScheduled() const { |
161 | return is_linked(); |
162 | } |
163 | |
164 | private: |
165 | typedef boost::intrusive:: |
166 | list<LoopCallback, boost::intrusive::constant_time_size<false>> |
167 | List; |
168 | |
169 | // EventBase needs access to LoopCallbackList (and therefore to hook_) |
170 | friend class EventBase; |
171 | friend class VirtualEventBase; |
172 | std::shared_ptr<RequestContext> context_; |
173 | }; |
174 | |
175 | class FunctionLoopCallback : public LoopCallback { |
176 | public: |
177 | explicit FunctionLoopCallback(Func&& function) |
178 | : function_(std::move(function)) {} |
179 | |
180 | void runLoopCallback() noexcept override { |
181 | function_(); |
182 | delete this; |
183 | } |
184 | |
185 | private: |
186 | Func function_; |
187 | }; |
188 | |
189 | // Like FunctionLoopCallback, but saves one allocation. Use with caution. |
190 | // |
191 | // The caller is responsible for maintaining the lifetime of this callback |
192 | // until after the point at which the contained function is called. |
193 | class StackFunctionLoopCallback : public LoopCallback { |
194 | public: |
195 | explicit StackFunctionLoopCallback(Func&& function) |
196 | : function_(std::move(function)) {} |
197 | void runLoopCallback() noexcept override { |
198 | Func(std::move(function_))(); |
199 | } |
200 | |
201 | private: |
202 | Func function_; |
203 | }; |
204 | |
205 | /** |
206 | * Create a new EventBase object. |
207 | * |
208 | * Same as EventBase(true), which constructs an EventBase that measures time. |
209 | */ |
210 | EventBase() : EventBase(true) {} |
211 | |
212 | /** |
213 | * Create a new EventBase object. |
214 | * |
215 | * @param enableTimeMeasurement Informs whether this event base should measure |
216 | * time. Disabling it would likely improve |
217 | * performance, but will disable some features |
218 | * that relies on time-measurement, including: |
219 | * observer, max latency and avg loop time. |
220 | */ |
221 | explicit EventBase(bool enableTimeMeasurement); |
222 | |
223 | /** |
224 | * Create a new EventBase object that will use the specified libevent |
225 | * event_base object to drive the event loop. |
226 | * |
227 | * The EventBase will take ownership of this event_base, and will call |
228 | * event_base_free(evb) when the EventBase is destroyed. |
229 | * |
230 | * @param enableTimeMeasurement Informs whether this event base should measure |
231 | * time. Disabling it would likely improve |
232 | * performance, but will disable some features |
233 | * that relies on time-measurement, including: |
234 | * observer, max latency and avg loop time. |
235 | */ |
236 | explicit EventBase(event_base* evb, bool enableTimeMeasurement = true); |
237 | ~EventBase() override; |
238 | |
239 | /** |
240 | * Runs the event loop. |
241 | * |
242 | * loop() will loop waiting for I/O or timeouts and invoking EventHandler |
243 | * and AsyncTimeout callbacks as their events become ready. loop() will |
244 | * only return when there are no more events remaining to process, or after |
245 | * terminateLoopSoon() has been called. |
246 | * |
247 | * loop() may be called again to restart event processing after a previous |
248 | * call to loop() or loopForever() has returned. |
249 | * |
250 | * Returns true if the loop completed normally (if it processed all |
251 | * outstanding requests, or if terminateLoopSoon() was called). If an error |
252 | * occurs waiting for events, false will be returned. |
253 | */ |
254 | bool loop(); |
255 | |
256 | /** |
257 | * Same as loop(), but doesn't wait for all keep-alive tokens to be released. |
258 | */ |
259 | [[deprecated("This should only be used in legacy unit tests" )]] bool |
260 | loopIgnoreKeepAlive(); |
261 | |
262 | /** |
263 | * Wait for some events to become active, run them, then return. |
264 | * |
265 | * When EVLOOP_NONBLOCK is set in flags, the loop won't block if there |
266 | * are not any events to process. |
267 | * |
268 | * This is useful for callers that want to run the loop manually. |
269 | * |
270 | * Returns the same result as loop(). |
271 | */ |
272 | bool loopOnce(int flags = 0); |
273 | |
274 | /** |
275 | * Runs the event loop. |
276 | * |
277 | * loopForever() behaves like loop(), except that it keeps running even if |
278 | * when there are no more user-supplied EventHandlers or AsyncTimeouts |
279 | * registered. It will only return after terminateLoopSoon() has been |
280 | * called. |
281 | * |
282 | * This is useful for callers that want to wait for other threads to call |
283 | * runInEventBaseThread(), even when there are no other scheduled events. |
284 | * |
285 | * loopForever() may be called again to restart event processing after a |
286 | * previous call to loop() or loopForever() has returned. |
287 | * |
288 | * Throws a std::system_error if an error occurs. |
289 | */ |
290 | void loopForever(); |
291 | |
292 | /** |
293 | * Causes the event loop to exit soon. |
294 | * |
295 | * This will cause an existing call to loop() or loopForever() to stop event |
296 | * processing and return, even if there are still events remaining to be |
297 | * processed. |
298 | * |
299 | * It is safe to call terminateLoopSoon() from another thread to cause loop() |
300 | * to wake up and return in the EventBase loop thread. terminateLoopSoon() |
301 | * may also be called from the loop thread itself (for example, a |
302 | * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to |
303 | * cause the loop to exit after the callback returns.) If the loop is not |
304 | * running, this will cause the next call to loop to terminate soon after |
305 | * starting. If a loop runs out of work (and so terminates on its own) |
306 | * concurrently with a call to terminateLoopSoon(), this may cause a race |
307 | * condition. |
308 | * |
309 | * Note that the caller is responsible for ensuring that cleanup of all event |
310 | * callbacks occurs properly. Since terminateLoopSoon() causes the loop to |
311 | * exit even when there are pending events present, there may be remaining |
312 | * callbacks present waiting to be invoked. If the loop is later restarted |
313 | * pending events will continue to be processed normally, however if the |
314 | * EventBase is destroyed after calling terminateLoopSoon() it is the |
315 | * caller's responsibility to ensure that cleanup happens properly even if |
316 | * some outstanding events are never processed. |
317 | */ |
318 | void terminateLoopSoon(); |
319 | |
320 | /** |
321 | * Adds the given callback to a queue of things run after the current pass |
322 | * through the event loop completes. Note that if this callback calls |
323 | * runInLoop() the new callback won't be called until the main event loop |
324 | * has gone through a cycle. |
325 | * |
326 | * This method may only be called from the EventBase's thread. This |
327 | * essentially allows an event handler to schedule an additional callback to |
328 | * be invoked after it returns. |
329 | * |
330 | * Use runInEventBaseThread() to schedule functions from another thread. |
331 | * |
332 | * The thisIteration parameter makes this callback run in this loop |
333 | * iteration, instead of the next one, even if called from a |
334 | * runInLoop callback (normal io callbacks that call runInLoop will |
335 | * always run in this iteration). This was originally added to |
336 | * support detachEventBase, as a user callback may have called |
337 | * terminateLoopSoon(), but we want to make sure we detach. Also, |
338 | * detachEventBase almost always must be called from the base event |
339 | * loop to ensure the stack is unwound, since most users of |
340 | * EventBase are not thread safe. |
341 | * |
342 | * Ideally we would not need thisIteration, and instead just use |
343 | * runInLoop with loop() (instead of terminateLoopSoon). |
344 | */ |
345 | void runInLoop(LoopCallback* callback, bool thisIteration = false); |
346 | |
347 | /** |
348 | * Convenience function to call runInLoop() with a folly::Function. |
349 | * |
350 | * This creates a LoopCallback object to wrap the folly::Function, and invoke |
351 | * the folly::Function when the loop callback fires. This is slightly more |
352 | * expensive than defining your own LoopCallback, but more convenient in |
353 | * areas that aren't too performance sensitive. |
354 | * |
355 | * This method may only be called from the EventBase's thread. This |
356 | * essentially allows an event handler to schedule an additional callback to |
357 | * be invoked after it returns. |
358 | * |
359 | * Use runInEventBaseThread() to schedule functions from another thread. |
360 | */ |
361 | void runInLoop(Func c, bool thisIteration = false); |
362 | |
363 | /** |
364 | * Adds the given callback to a queue of things run before destruction |
365 | * of current EventBase. |
366 | * |
367 | * This allows users of EventBase that run in it, but don't control it, |
368 | * to be notified before EventBase gets destructed. |
369 | * |
370 | * Note: will be called from the thread that invoked EventBase destructor, |
371 | * before the final run of loop callbacks. |
372 | */ |
373 | void runOnDestruction(LoopCallback* callback); |
374 | |
375 | /** |
376 | * Adds a callback that will run immediately *before* the event loop. |
377 | * This is very similar to runInLoop(), but will not cause the loop to break: |
378 | * For example, this callback could be used to get loop times. |
379 | */ |
380 | void runBeforeLoop(LoopCallback* callback); |
381 | |
382 | /** |
383 | * Run the specified function in the EventBase's thread. |
384 | * |
385 | * This method is thread-safe, and may be called from another thread. |
386 | * |
387 | * If runInEventBaseThread() is called when the EventBase loop is not |
388 | * running, the function call will be delayed until the next time the loop is |
389 | * started. |
390 | * |
391 | * If runInEventBaseThread() returns true the function has successfully been |
392 | * scheduled to run in the loop thread. However, if the loop is terminated |
393 | * (and never later restarted) before it has a chance to run the requested |
394 | * function, the function will be run upon the EventBase's destruction. |
395 | * |
396 | * If two calls to runInEventBaseThread() are made from the same thread, the |
397 | * functions will always be run in the order that they were scheduled. |
398 | * Ordering between functions scheduled from separate threads is not |
399 | * guaranteed. |
400 | * |
401 | * @param fn The function to run. The function must not throw any |
402 | * exceptions. |
403 | * @param arg An argument to pass to the function. |
404 | * |
405 | * @return Returns true if the function was successfully scheduled, or false |
406 | * if there was an error scheduling the function. |
407 | */ |
408 | template <typename T> |
409 | bool runInEventBaseThread(void (*fn)(T*), T* arg); |
410 | |
411 | /** |
412 | * Run the specified function in the EventBase's thread |
413 | * |
414 | * This version of runInEventBaseThread() takes a folly::Function object. |
415 | * Note that this may be less efficient than the version that takes a plain |
416 | * function pointer and void* argument, if moving the function is expensive |
417 | * (e.g., if it wraps a lambda which captures some values with expensive move |
418 | * constructors). |
419 | * |
420 | * If the loop is terminated (and never later restarted) before it has a |
421 | * chance to run the requested function, the function will be run upon the |
422 | * EventBase's destruction. |
423 | * |
424 | * The function must not throw any exceptions. |
425 | */ |
426 | bool runInEventBaseThread(Func fn); |
427 | |
428 | /** |
429 | * Run the specified function in the EventBase's thread. |
430 | * |
431 | * This method is thread-safe, and may be called from another thread. |
432 | * |
433 | * If runInEventBaseThreadAlwaysEnqueue() is called when the EventBase loop is |
434 | * not running, the function call will be delayed until the next time the loop |
435 | * is started. |
436 | * |
437 | * If runInEventBaseThreadAlwaysEnqueue() returns true the function has |
438 | * successfully been scheduled to run in the loop thread. However, if the |
439 | * loop is terminated (and never later restarted) before it has a chance to |
440 | * run the requested function, the function will be run upon the EventBase's |
441 | * destruction. |
442 | * |
443 | * If two calls to runInEventBaseThreadAlwaysEnqueue() are made from the same |
444 | * thread, the functions will always be run in the order that they were |
445 | * scheduled. Ordering between functions scheduled from separate threads is |
446 | * not guaranteed. If a call is made from the EventBase thread, the function |
447 | * will not be executed inline and will be queued to the same queue as if the |
448 | * call would have been made from a different thread |
449 | * |
450 | * @param fn The function to run. The function must not throw any |
451 | * exceptions. |
452 | * @param arg An argument to pass to the function. |
453 | * |
454 | * @return Returns true if the function was successfully scheduled, or false |
455 | * if there was an error scheduling the function. |
456 | */ |
457 | template <typename T> |
458 | bool runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg); |
459 | |
460 | /** |
461 | * Run the specified function in the EventBase's thread |
462 | * |
463 | * This version of runInEventBaseThreadAlwaysEnqueue() takes a folly::Function |
464 | * object. Note that this may be less efficient than the version that takes a |
465 | * plain function pointer and void* argument, if moving the function is |
466 | * expensive (e.g., if it wraps a lambda which captures some values with |
467 | * expensive move constructors). |
468 | * |
469 | * If the loop is terminated (and never later restarted) before it has a |
470 | * chance to run the requested function, the function will be run upon the |
471 | * EventBase's destruction. |
472 | * |
473 | * The function must not throw any exceptions. |
474 | */ |
475 | bool runInEventBaseThreadAlwaysEnqueue(Func fn); |
476 | |
477 | /* |
478 | * Like runInEventBaseThread, but the caller waits for the callback to be |
479 | * executed. |
480 | */ |
481 | template <typename T> |
482 | bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg); |
483 | |
484 | /* |
485 | * Like runInEventBaseThread, but the caller waits for the callback to be |
486 | * executed. |
487 | */ |
488 | bool runInEventBaseThreadAndWait(Func fn); |
489 | |
490 | /* |
491 | * Like runInEventBaseThreadAndWait, except if the caller is already in the |
492 | * event base thread, the functor is simply run inline. |
493 | */ |
494 | template <typename T> |
495 | bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg); |
496 | |
497 | /* |
498 | * Like runInEventBaseThreadAndWait, except if the caller is already in the |
499 | * event base thread, the functor is simply run inline. |
500 | */ |
501 | bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); |
502 | |
503 | /** |
504 | * Set the maximum desired latency in us and provide a callback which will be |
505 | * called when that latency is exceeded. |
506 | * OBS: This functionality depends on time-measurement. |
507 | */ |
508 | void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { |
509 | assert(enableTimeMeasurement_); |
510 | maxLatency_ = maxLatency; |
511 | maxLatencyCob_ = std::move(maxLatencyCob); |
512 | } |
513 | |
514 | /** |
515 | * Set smoothing coefficient for loop load average; # of milliseconds |
516 | * for exp(-1) (1/2.71828...) decay. |
517 | */ |
518 | void setLoadAvgMsec(std::chrono::milliseconds ms); |
519 | |
520 | /** |
521 | * reset the load average to a desired value |
522 | */ |
523 | void resetLoadAvg(double value = 0.0); |
524 | |
525 | /** |
526 | * Get the average loop time in microseconds (an exponentially-smoothed ave) |
527 | */ |
528 | double getAvgLoopTime() const { |
529 | assert(enableTimeMeasurement_); |
530 | return avgLoopTime_.get(); |
531 | } |
532 | |
533 | /** |
534 | * check if the event base loop is running. |
535 | */ |
536 | bool isRunning() const { |
537 | return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); |
538 | } |
539 | |
540 | /** |
541 | * wait until the event loop starts (after starting the event loop thread). |
542 | */ |
543 | void waitUntilRunning(); |
544 | |
545 | size_t getNotificationQueueSize() const; |
546 | |
547 | void setMaxReadAtOnce(uint32_t maxAtOnce); |
548 | |
549 | /** |
550 | * Verify that current thread is the EventBase thread, if the EventBase is |
551 | * running. |
552 | */ |
553 | bool isInEventBaseThread() const { |
554 | auto tid = loopThread_.load(std::memory_order_relaxed); |
555 | return tid == std::thread::id() || tid == std::this_thread::get_id(); |
556 | } |
557 | |
558 | bool inRunningEventBaseThread() const { |
559 | return loopThread_.load(std::memory_order_relaxed) == |
560 | std::this_thread::get_id(); |
561 | } |
562 | |
563 | /** |
564 | * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for |
565 | * dcheckIsInEventBaseThread), but it prints more information on |
566 | * failure. |
567 | */ |
568 | void checkIsInEventBaseThread() const; |
569 | void dcheckIsInEventBaseThread() const { |
570 | if (kIsDebug) { |
571 | checkIsInEventBaseThread(); |
572 | } |
573 | } |
574 | |
575 | HHWheelTimer& timer() { |
576 | if (!wheelTimer_) { |
577 | wheelTimer_ = HHWheelTimer::newTimer(this); |
578 | } |
579 | return *wheelTimer_.get(); |
580 | } |
581 | |
582 | // --------- interface to underlying libevent base ------------ |
583 | // Avoid using these functions if possible. These functions are not |
584 | // guaranteed to always be present if we ever provide alternative EventBase |
585 | // implementations that do not use libevent internally. |
586 | event_base* getLibeventBase() const { |
587 | return evb_; |
588 | } |
589 | static const char* getLibeventVersion(); |
590 | static const char* getLibeventMethod(); |
591 | |
592 | /** |
593 | * only EventHandler/AsyncTimeout subclasses and ourselves should |
594 | * ever call this. |
595 | * |
596 | * This is used to mark the beginning of a new loop cycle by the |
597 | * first handler fired within that cycle. |
598 | * |
599 | */ |
600 | void bumpHandlingTime() final; |
601 | |
602 | class SmoothLoopTime { |
603 | public: |
604 | explicit SmoothLoopTime(std::chrono::microseconds timeInterval) |
605 | : expCoeff_(-1.0 / timeInterval.count()), value_(0.0) { |
606 | VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; |
607 | } |
608 | |
609 | void setTimeInterval(std::chrono::microseconds timeInterval); |
610 | void reset(double value = 0.0); |
611 | |
612 | void addSample( |
613 | std::chrono::microseconds total, |
614 | std::chrono::microseconds busy); |
615 | |
616 | double get() const { |
617 | // Add the outstanding buffered times linearly, to avoid |
618 | // expensive exponentiation |
619 | auto lcoeff = buffer_time_.count() * -expCoeff_; |
620 | return value_ * (1.0 - lcoeff) + lcoeff * busy_buffer_.count(); |
621 | } |
622 | |
623 | void dampen(double factor) { |
624 | value_ *= factor; |
625 | } |
626 | |
627 | private: |
628 | double expCoeff_; |
629 | double value_; |
630 | std::chrono::microseconds buffer_time_{0}; |
631 | std::chrono::microseconds busy_buffer_{0}; |
632 | std::size_t buffer_cnt_{0}; |
633 | static constexpr std::chrono::milliseconds buffer_interval_{10}; |
634 | }; |
635 | |
636 | void setObserver(const std::shared_ptr<EventBaseObserver>& observer) { |
637 | assert(enableTimeMeasurement_); |
638 | observer_ = observer; |
639 | } |
640 | |
641 | const std::shared_ptr<EventBaseObserver>& getObserver() { |
642 | return observer_; |
643 | } |
644 | |
645 | /** |
646 | * Setup execution observation/instrumentation for every EventHandler |
647 | * executed in this EventBase. |
648 | * |
649 | * @param executionObserver EventHandle's execution observer. |
650 | */ |
651 | void setExecutionObserver(ExecutionObserver* observer) { |
652 | executionObserver_ = observer; |
653 | } |
654 | |
655 | /** |
656 | * Gets the execution observer associated with this EventBase. |
657 | */ |
658 | ExecutionObserver* getExecutionObserver() { |
659 | return executionObserver_; |
660 | } |
661 | |
662 | /** |
663 | * Set the name of the thread that runs this event base. |
664 | */ |
665 | void setName(const std::string& name); |
666 | |
667 | /** |
668 | * Returns the name of the thread that runs this event base. |
669 | */ |
670 | const std::string& getName(); |
671 | |
672 | /// Implements the Executor interface |
673 | void add(Cob fn) override { |
674 | // runInEventBaseThread() takes a const&, |
675 | // so no point in doing std::move here. |
676 | runInEventBaseThread(std::move(fn)); |
677 | } |
678 | |
679 | /// Implements the DrivableExecutor interface |
680 | void drive() override { |
681 | ++loopKeepAliveCount_; |
682 | SCOPE_EXIT { |
683 | --loopKeepAliveCount_; |
684 | }; |
685 | loopOnce(); |
686 | } |
687 | |
688 | // Implements the ScheduledExecutor interface |
689 | void scheduleAt(Func&& fn, TimePoint const& timeout) override; |
690 | |
691 | // TimeoutManager |
692 | void attachTimeoutManager( |
693 | AsyncTimeout* obj, |
694 | TimeoutManager::InternalEnum internal) final; |
695 | |
696 | void detachTimeoutManager(AsyncTimeout* obj) final; |
697 | |
698 | bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) |
699 | final; |
700 | |
701 | void cancelTimeout(AsyncTimeout* obj) final; |
702 | |
703 | bool isInTimeoutManagerThread() final { |
704 | return isInEventBaseThread(); |
705 | } |
706 | |
707 | // Returns a VirtualEventBase attached to this EventBase. Can be used to |
708 | // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be |
709 | // destroyed together with the EventBase. |
710 | // |
711 | // Any number of VirtualEventBases instances may be independently constructed, |
712 | // which are backed by this EventBase. This method should be only used if you |
713 | // don't need to manage the life time of the VirtualEventBase used. |
714 | folly::VirtualEventBase& getVirtualEventBase(); |
715 | |
716 | /// Implements the IOExecutor interface |
717 | EventBase* getEventBase() override; |
718 | |
719 | protected: |
720 | bool keepAliveAcquire() override { |
721 | if (inRunningEventBaseThread()) { |
722 | loopKeepAliveCount_++; |
723 | } else { |
724 | loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); |
725 | } |
726 | return true; |
727 | } |
728 | |
729 | void keepAliveRelease() override { |
730 | if (!inRunningEventBaseThread()) { |
731 | return add([this] { loopKeepAliveCount_--; }); |
732 | } |
733 | loopKeepAliveCount_--; |
734 | } |
735 | |
736 | private: |
737 | void applyLoopKeepAlive(); |
738 | |
739 | ssize_t loopKeepAliveCount(); |
740 | |
741 | /* |
742 | * Helper function that tells us whether we have already handled |
743 | * some event/timeout/callback in this loop iteration. |
744 | */ |
745 | bool nothingHandledYet() const noexcept; |
746 | |
747 | typedef LoopCallback::List LoopCallbackList; |
748 | class FunctionRunner; |
749 | |
750 | bool loopBody(int flags = 0, bool ignoreKeepAlive = false); |
751 | |
752 | // executes any callbacks queued by runInLoop(); returns false if none found |
753 | bool runLoopCallbacks(); |
754 | |
755 | void initNotificationQueue(); |
756 | |
757 | // should only be accessed through public getter |
758 | HHWheelTimer::UniquePtr wheelTimer_; |
759 | |
760 | LoopCallbackList loopCallbacks_; |
761 | LoopCallbackList runBeforeLoopCallbacks_; |
762 | LoopCallbackList onDestructionCallbacks_; |
763 | |
764 | // This will be null most of the time, but point to currentCallbacks |
765 | // if we are in the middle of running loop callbacks, such that |
766 | // runInLoop(..., true) will always run in the current loop |
767 | // iteration. |
768 | LoopCallbackList* runOnceCallbacks_; |
769 | |
770 | // stop_ is set by terminateLoopSoon() and is used by the main loop |
771 | // to determine if it should exit |
772 | std::atomic<bool> stop_; |
773 | |
774 | // The ID of the thread running the main loop. |
775 | // std::thread::id{} if loop is not running. |
776 | std::atomic<std::thread::id> loopThread_; |
777 | |
778 | // pointer to underlying event_base class doing the heavy lifting |
779 | event_base* evb_; |
780 | |
781 | // A notification queue for runInEventBaseThread() to use |
782 | // to send function requests to the EventBase thread. |
783 | std::unique_ptr<NotificationQueue<Func>> queue_; |
784 | std::unique_ptr<FunctionRunner> fnRunner_; |
785 | ssize_t loopKeepAliveCount_{0}; |
786 | std::atomic<ssize_t> loopKeepAliveCountAtomic_{0}; |
787 | bool loopKeepAliveActive_{false}; |
788 | |
789 | // limit for latency in microseconds (0 disables) |
790 | std::chrono::microseconds maxLatency_; |
791 | |
792 | // exponentially-smoothed average loop time for latency-limiting |
793 | SmoothLoopTime avgLoopTime_; |
794 | |
795 | // smoothed loop time used to invoke latency callbacks; differs from |
796 | // avgLoopTime_ in that it's scaled down after triggering a callback |
797 | // to reduce spamminess |
798 | SmoothLoopTime maxLatencyLoopTime_; |
799 | |
800 | // callback called when latency limit is exceeded |
801 | Func maxLatencyCob_; |
802 | |
803 | // Enables/disables time measurements in loopBody(). if disabled, the |
804 | // following functionality that relies on time-measurement, will not |
805 | // be supported: avg loop time, observer and max latency. |
806 | const bool enableTimeMeasurement_; |
807 | |
808 | // Wrap-around loop counter to detect beginning of each loop |
809 | std::size_t nextLoopCnt_; |
810 | std::size_t latestLoopCnt_; |
811 | std::chrono::steady_clock::time_point startWork_; |
812 | // Prevent undefined behavior from invoking event_base_loop() reentrantly. |
813 | // This is needed since many projects use libevent-1.4, which lacks commit |
814 | // b557b175c00dc462c1fce25f6e7dd67121d2c001 from |
815 | // https://github.com/libevent/libevent/. |
816 | bool invokingLoop_{false}; |
817 | |
818 | // Observer to export counters |
819 | std::shared_ptr<EventBaseObserver> observer_; |
820 | uint32_t observerSampleCount_; |
821 | |
822 | // EventHandler's execution observer. |
823 | ExecutionObserver* executionObserver_; |
824 | |
825 | // Name of the thread running this EventBase |
826 | std::string name_; |
827 | |
828 | // allow runOnDestruction() to be called from any threads |
829 | std::mutex onDestructionCallbacksMutex_; |
830 | |
831 | // see EventBaseLocal |
832 | friend class detail::EventBaseLocalBase; |
833 | template <typename T> |
834 | friend class EventBaseLocal; |
835 | std::unordered_map<std::size_t, std::shared_ptr<void>> localStorage_; |
836 | std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_; |
837 | |
838 | folly::once_flag virtualEventBaseInitFlag_; |
839 | std::unique_ptr<VirtualEventBase> virtualEventBase_; |
840 | }; |
841 | |
842 | template <typename T> |
843 | bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) { |
844 | return runInEventBaseThread([=] { fn(arg); }); |
845 | } |
846 | |
847 | template <typename T> |
848 | bool EventBase::runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) { |
849 | return runInEventBaseThreadAlwaysEnqueue([=] { fn(arg); }); |
850 | } |
851 | |
852 | template <typename T> |
853 | bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { |
854 | return runInEventBaseThreadAndWait([=] { fn(arg); }); |
855 | } |
856 | |
857 | template <typename T> |
858 | bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait( |
859 | void (*fn)(T*), |
860 | T* arg) { |
861 | return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); }); |
862 | } |
863 | |
864 | } // namespace folly |
865 | |