1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/Function.h> |
20 | #include <folly/Range.h> |
21 | #include <folly/hash/Hash.h> |
22 | #include <chrono> |
23 | #include <condition_variable> |
24 | #include <mutex> |
25 | #include <thread> |
26 | #include <unordered_map> |
27 | #include <vector> |
28 | |
29 | namespace folly { |
30 | |
31 | /** |
32 | * Schedules any number of functions to run at various intervals. E.g., |
33 | * |
34 | * FunctionScheduler fs; |
35 | * |
36 | * fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker"); |
37 | * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff"); |
38 | * fs.start(); |
39 | * ........ |
40 | * fs.cancelFunction("ticker"); |
41 | * fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker"); |
42 | * ........ |
43 | * fs.shutdown(); |
44 | * |
45 | * |
46 | * Note: the class uses only one thread - if you want to use more than one |
47 | * thread, either use multiple FunctionScheduler objects, or check out |
48 | * ThreadedRepeatingFunctionRunner.h for a much simpler contract of |
49 | * "run each function periodically in its own thread". |
50 | * |
51 | * start() schedules the functions, while shutdown() terminates further |
52 | * scheduling. |
53 | */ |
54 | class FunctionScheduler { |
55 | public: |
56 | FunctionScheduler(); |
57 | ~FunctionScheduler(); |
58 | |
59 | /** |
60 | * By default steady is false, meaning schedules may lag behind overtime. |
61 | * This could be due to long running tasks or time drift because of randomness |
62 | * in thread wakeup time. |
63 | * By setting steady to true, FunctionScheduler will attempt to catch up. |
64 | * i.e. more like a cronjob |
65 | * |
66 | * NOTE: it's only safe to set this before calling start() |
67 | */ |
68 | void setSteady(bool steady) { |
69 | steady_ = steady; |
70 | } |
71 | |
72 | /* |
73 | * Parameters to control the function interval. |
74 | * |
75 | * If isPoisson is true, then use std::poisson_distribution to pick the |
76 | * interval between each invocation of the function. |
77 | * |
78 | * If isPoisson is false, then always use the fixed interval specified to |
79 | * addFunction(). |
80 | */ |
81 | struct LatencyDistribution { |
82 | bool isPoisson; |
83 | double poissonMean; |
84 | |
85 | LatencyDistribution(bool poisson, double mean) |
86 | : isPoisson(poisson), poissonMean(mean) {} |
87 | }; |
88 | |
89 | /** |
90 | * Adds a new function to the FunctionScheduler. |
91 | * |
92 | * Functions will not be run until start() is called. When start() is |
93 | * called, each function will be run after its specified startDelay. |
94 | * Functions may also be added after start() has been called, in which case |
95 | * startDelay is still honored. |
96 | * |
97 | * Throws an exception on error. In particular, each function must have a |
98 | * unique name--two functions cannot be added with the same name. |
99 | */ |
100 | void addFunction( |
101 | Function<void()>&& cb, |
102 | std::chrono::milliseconds interval, |
103 | StringPiece nameID = StringPiece(), |
104 | std::chrono::milliseconds startDelay = std::chrono::milliseconds(0)); |
105 | |
106 | /* |
107 | * Add a new function to the FunctionScheduler with a specified |
108 | * LatencyDistribution |
109 | */ |
110 | void addFunction( |
111 | Function<void()>&& cb, |
112 | std::chrono::milliseconds interval, |
113 | const LatencyDistribution& latencyDistr, |
114 | StringPiece nameID = StringPiece(), |
115 | std::chrono::milliseconds startDelay = std::chrono::milliseconds(0)); |
116 | |
117 | /** |
118 | * Adds a new function to the FunctionScheduler to run only once. |
119 | */ |
120 | void addFunctionOnce( |
121 | Function<void()>&& cb, |
122 | StringPiece nameID = StringPiece(), |
123 | std::chrono::milliseconds startDelay = std::chrono::milliseconds(0)); |
124 | |
125 | /** |
126 | * Add a new function to the FunctionScheduler with the time |
127 | * interval being distributed uniformly within the given interval |
128 | * [minInterval, maxInterval]. |
129 | */ |
130 | void addFunctionUniformDistribution( |
131 | Function<void()>&& cb, |
132 | std::chrono::milliseconds minInterval, |
133 | std::chrono::milliseconds maxInterval, |
134 | StringPiece nameID, |
135 | std::chrono::milliseconds startDelay); |
136 | |
137 | /** |
138 | * Add a new function to the FunctionScheduler whose start times are attempted |
139 | * to be scheduled so that they are congruent modulo the interval. |
140 | * Note: The scheduling of the next run time happens right before the function |
141 | * invocation, so the first time a function takes more time than the interval, |
142 | * it will be reinvoked immediately. |
143 | */ |
144 | void addFunctionConsistentDelay( |
145 | Function<void()>&& cb, |
146 | std::chrono::milliseconds interval, |
147 | StringPiece nameID = StringPiece(), |
148 | std::chrono::milliseconds startDelay = std::chrono::milliseconds(0)); |
149 | |
150 | /** |
151 | * A type alias for function that is called to determine the time |
152 | * interval for the next scheduled run. |
153 | */ |
154 | using IntervalDistributionFunc = Function<std::chrono::milliseconds()>; |
155 | /** |
156 | * A type alias for function that returns the next run time, given the current |
157 | * run time and the current start time. |
158 | */ |
159 | using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point( |
160 | std::chrono::steady_clock::time_point, |
161 | std::chrono::steady_clock::time_point)>; |
162 | |
163 | /** |
164 | * Add a new function to the FunctionScheduler. The scheduling interval |
165 | * is determined by the interval distribution functor, which is called |
166 | * every time the next function execution is scheduled. This allows |
167 | * for supporting custom interval distribution algorithms in addition |
168 | * to built in constant interval; and Poisson and jitter distributions |
169 | * (@see FunctionScheduler::addFunction and |
170 | * @see FunctionScheduler::addFunctionJitterInterval). |
171 | */ |
172 | void addFunctionGenericDistribution( |
173 | Function<void()>&& cb, |
174 | IntervalDistributionFunc&& intervalFunc, |
175 | const std::string& nameID, |
176 | const std::string& intervalDescr, |
177 | std::chrono::milliseconds startDelay); |
178 | |
179 | /** |
180 | * Like addFunctionGenericDistribution, adds a new function to the |
181 | * FunctionScheduler, but the next run time is determined directly by the |
182 | * given functor, rather than by adding an interval. |
183 | */ |
184 | void addFunctionGenericNextRunTimeFunctor( |
185 | Function<void()>&& cb, |
186 | NextRunTimeFunc&& fn, |
187 | const std::string& nameID, |
188 | const std::string& intervalDescr, |
189 | std::chrono::milliseconds startDelay); |
190 | |
191 | /** |
192 | * Cancels the function with the specified name, so it will no longer be run. |
193 | * |
194 | * Returns false if no function exists with the specified name. |
195 | */ |
196 | bool cancelFunction(StringPiece nameID); |
197 | bool cancelFunctionAndWait(StringPiece nameID); |
198 | |
199 | /** |
200 | * All functions registered will be canceled. |
201 | */ |
202 | void cancelAllFunctions(); |
203 | void cancelAllFunctionsAndWait(); |
204 | |
205 | /** |
206 | * Resets the specified function's timer. |
207 | * When resetFunctionTimer is called, the specified function's timer will |
208 | * be reset with the same parameters it was passed initially, including |
209 | * its startDelay. If the startDelay was 0, the function will be invoked |
210 | * immediately. |
211 | * |
212 | * Returns false if no function exists with the specified name. |
213 | */ |
214 | bool resetFunctionTimer(StringPiece nameID); |
215 | |
216 | /** |
217 | * Starts the scheduler. |
218 | * |
219 | * Returns false if the scheduler was already running. |
220 | */ |
221 | bool start(); |
222 | |
223 | /** |
224 | * Stops the FunctionScheduler. |
225 | * |
226 | * It may be restarted later by calling start() again. |
227 | * Returns false if the scheduler was not running. |
228 | */ |
229 | bool shutdown(); |
230 | |
231 | /** |
232 | * Set the name of the worker thread. |
233 | */ |
234 | void setThreadName(StringPiece threadName); |
235 | |
236 | private: |
237 | struct RepeatFunc { |
238 | Function<void()> cb; |
239 | NextRunTimeFunc nextRunTimeFunc; |
240 | std::chrono::steady_clock::time_point nextRunTime; |
241 | std::string name; |
242 | std::chrono::milliseconds startDelay; |
243 | std::string intervalDescr; |
244 | bool runOnce; |
245 | |
246 | RepeatFunc( |
247 | Function<void()>&& cback, |
248 | IntervalDistributionFunc&& intervalFn, |
249 | const std::string& nameID, |
250 | const std::string& intervalDistDescription, |
251 | std::chrono::milliseconds delay, |
252 | bool once) |
253 | : RepeatFunc( |
254 | std::move(cback), |
255 | getNextRunTimeFunc(std::move(intervalFn)), |
256 | nameID, |
257 | intervalDistDescription, |
258 | delay, |
259 | once) {} |
260 | |
261 | RepeatFunc( |
262 | Function<void()>&& cback, |
263 | NextRunTimeFunc&& nextRunTimeFn, |
264 | const std::string& nameID, |
265 | const std::string& intervalDistDescription, |
266 | std::chrono::milliseconds delay, |
267 | bool once) |
268 | : cb(std::move(cback)), |
269 | nextRunTimeFunc(std::move(nextRunTimeFn)), |
270 | nextRunTime(), |
271 | name(nameID), |
272 | startDelay(delay), |
273 | intervalDescr(intervalDistDescription), |
274 | runOnce(once) {} |
275 | |
276 | static NextRunTimeFunc getNextRunTimeFunc( |
277 | IntervalDistributionFunc&& intervalFn) { |
278 | return [intervalFn = std::move(intervalFn)]( |
279 | std::chrono::steady_clock::time_point /* curNextRunTime */, |
280 | std::chrono::steady_clock::time_point curTime) mutable { |
281 | return curTime + intervalFn(); |
282 | }; |
283 | } |
284 | |
285 | std::chrono::steady_clock::time_point getNextRunTime() const { |
286 | return nextRunTime; |
287 | } |
288 | void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) { |
289 | nextRunTime = nextRunTimeFunc(nextRunTime, curTime); |
290 | } |
291 | void setNextRunTimeSteady() { |
292 | nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime); |
293 | } |
294 | void resetNextRunTime(std::chrono::steady_clock::time_point curTime) { |
295 | nextRunTime = curTime + startDelay; |
296 | } |
297 | void cancel() { |
298 | // Simply reset cb to an empty function. |
299 | cb = {}; |
300 | } |
301 | bool isValid() const { |
302 | return bool(cb); |
303 | } |
304 | }; |
305 | |
306 | struct RunTimeOrder { |
307 | bool operator()( |
308 | const std::unique_ptr<RepeatFunc>& f1, |
309 | const std::unique_ptr<RepeatFunc>& f2) const { |
310 | return f1->getNextRunTime() > f2->getNextRunTime(); |
311 | } |
312 | }; |
313 | |
314 | typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap; |
315 | typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap; |
316 | |
317 | void run(); |
318 | void runOneFunction( |
319 | std::unique_lock<std::mutex>& lock, |
320 | std::chrono::steady_clock::time_point now); |
321 | void cancelFunction(const std::unique_lock<std::mutex>& lock, RepeatFunc* it); |
322 | void addFunctionToHeap( |
323 | const std::unique_lock<std::mutex>& lock, |
324 | std::unique_ptr<RepeatFunc> func); |
325 | |
326 | template <typename RepeatFuncNextRunTimeFunc> |
327 | void addFunctionToHeapChecked( |
328 | Function<void()>&& cb, |
329 | RepeatFuncNextRunTimeFunc&& fn, |
330 | const std::string& nameID, |
331 | const std::string& intervalDescr, |
332 | std::chrono::milliseconds startDelay, |
333 | bool runOnce); |
334 | |
335 | void addFunctionInternal( |
336 | Function<void()>&& cb, |
337 | NextRunTimeFunc&& fn, |
338 | const std::string& nameID, |
339 | const std::string& intervalDescr, |
340 | std::chrono::milliseconds startDelay, |
341 | bool runOnce); |
342 | void addFunctionInternal( |
343 | Function<void()>&& cb, |
344 | IntervalDistributionFunc&& fn, |
345 | const std::string& nameID, |
346 | const std::string& intervalDescr, |
347 | std::chrono::milliseconds startDelay, |
348 | bool runOnce); |
349 | |
350 | // Return true if the current function is being canceled |
351 | bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock); |
352 | bool cancelFunctionWithLock( |
353 | std::unique_lock<std::mutex>& lock, |
354 | StringPiece nameID); |
355 | |
356 | std::thread thread_; |
357 | |
358 | // Mutex to protect our member variables. |
359 | std::mutex mutex_; |
360 | bool running_{false}; |
361 | |
362 | // The functions to run. |
363 | // This is a heap, ordered by next run time. |
364 | FunctionHeap functions_; |
365 | FunctionMap functionsMap_; |
366 | RunTimeOrder fnCmp_; |
367 | |
368 | // The function currently being invoked by the running thread. |
369 | // This is null when the running thread is idle |
370 | RepeatFunc* currentFunction_{nullptr}; |
371 | |
372 | // Condition variable that is signalled whenever a new function is added |
373 | // or when the FunctionScheduler is stopped. |
374 | std::condition_variable runningCondvar_; |
375 | |
376 | std::string threadName_; |
377 | bool steady_{false}; |
378 | bool cancellingCurrentFunction_{false}; |
379 | }; |
380 | |
381 | } // namespace folly |
382 | |