1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/Function.h>
20#include <folly/Range.h>
21#include <folly/hash/Hash.h>
22#include <chrono>
23#include <condition_variable>
24#include <mutex>
25#include <thread>
26#include <unordered_map>
27#include <vector>
28
29namespace folly {
30
31/**
32 * Schedules any number of functions to run at various intervals. E.g.,
33 *
34 * FunctionScheduler fs;
35 *
36 * fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
37 * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
38 * fs.start();
39 * ........
40 * fs.cancelFunction("ticker");
41 * fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
42 * ........
43 * fs.shutdown();
44 *
45 *
46 * Note: the class uses only one thread - if you want to use more than one
47 * thread, either use multiple FunctionScheduler objects, or check out
48 * ThreadedRepeatingFunctionRunner.h for a much simpler contract of
49 * "run each function periodically in its own thread".
50 *
51 * start() schedules the functions, while shutdown() terminates further
52 * scheduling.
53 */
54class FunctionScheduler {
55 public:
56 FunctionScheduler();
57 ~FunctionScheduler();
58
59 /**
60 * By default steady is false, meaning schedules may lag behind overtime.
61 * This could be due to long running tasks or time drift because of randomness
62 * in thread wakeup time.
63 * By setting steady to true, FunctionScheduler will attempt to catch up.
64 * i.e. more like a cronjob
65 *
66 * NOTE: it's only safe to set this before calling start()
67 */
68 void setSteady(bool steady) {
69 steady_ = steady;
70 }
71
72 /*
73 * Parameters to control the function interval.
74 *
75 * If isPoisson is true, then use std::poisson_distribution to pick the
76 * interval between each invocation of the function.
77 *
78 * If isPoisson is false, then always use the fixed interval specified to
79 * addFunction().
80 */
81 struct LatencyDistribution {
82 bool isPoisson;
83 double poissonMean;
84
85 LatencyDistribution(bool poisson, double mean)
86 : isPoisson(poisson), poissonMean(mean) {}
87 };
88
89 /**
90 * Adds a new function to the FunctionScheduler.
91 *
92 * Functions will not be run until start() is called. When start() is
93 * called, each function will be run after its specified startDelay.
94 * Functions may also be added after start() has been called, in which case
95 * startDelay is still honored.
96 *
97 * Throws an exception on error. In particular, each function must have a
98 * unique name--two functions cannot be added with the same name.
99 */
100 void addFunction(
101 Function<void()>&& cb,
102 std::chrono::milliseconds interval,
103 StringPiece nameID = StringPiece(),
104 std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
105
106 /*
107 * Add a new function to the FunctionScheduler with a specified
108 * LatencyDistribution
109 */
110 void addFunction(
111 Function<void()>&& cb,
112 std::chrono::milliseconds interval,
113 const LatencyDistribution& latencyDistr,
114 StringPiece nameID = StringPiece(),
115 std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
116
117 /**
118 * Adds a new function to the FunctionScheduler to run only once.
119 */
120 void addFunctionOnce(
121 Function<void()>&& cb,
122 StringPiece nameID = StringPiece(),
123 std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
124
125 /**
126 * Add a new function to the FunctionScheduler with the time
127 * interval being distributed uniformly within the given interval
128 * [minInterval, maxInterval].
129 */
130 void addFunctionUniformDistribution(
131 Function<void()>&& cb,
132 std::chrono::milliseconds minInterval,
133 std::chrono::milliseconds maxInterval,
134 StringPiece nameID,
135 std::chrono::milliseconds startDelay);
136
137 /**
138 * Add a new function to the FunctionScheduler whose start times are attempted
139 * to be scheduled so that they are congruent modulo the interval.
140 * Note: The scheduling of the next run time happens right before the function
141 * invocation, so the first time a function takes more time than the interval,
142 * it will be reinvoked immediately.
143 */
144 void addFunctionConsistentDelay(
145 Function<void()>&& cb,
146 std::chrono::milliseconds interval,
147 StringPiece nameID = StringPiece(),
148 std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
149
150 /**
151 * A type alias for function that is called to determine the time
152 * interval for the next scheduled run.
153 */
154 using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
155 /**
156 * A type alias for function that returns the next run time, given the current
157 * run time and the current start time.
158 */
159 using NextRunTimeFunc = Function<std::chrono::steady_clock::time_point(
160 std::chrono::steady_clock::time_point,
161 std::chrono::steady_clock::time_point)>;
162
163 /**
164 * Add a new function to the FunctionScheduler. The scheduling interval
165 * is determined by the interval distribution functor, which is called
166 * every time the next function execution is scheduled. This allows
167 * for supporting custom interval distribution algorithms in addition
168 * to built in constant interval; and Poisson and jitter distributions
169 * (@see FunctionScheduler::addFunction and
170 * @see FunctionScheduler::addFunctionJitterInterval).
171 */
172 void addFunctionGenericDistribution(
173 Function<void()>&& cb,
174 IntervalDistributionFunc&& intervalFunc,
175 const std::string& nameID,
176 const std::string& intervalDescr,
177 std::chrono::milliseconds startDelay);
178
179 /**
180 * Like addFunctionGenericDistribution, adds a new function to the
181 * FunctionScheduler, but the next run time is determined directly by the
182 * given functor, rather than by adding an interval.
183 */
184 void addFunctionGenericNextRunTimeFunctor(
185 Function<void()>&& cb,
186 NextRunTimeFunc&& fn,
187 const std::string& nameID,
188 const std::string& intervalDescr,
189 std::chrono::milliseconds startDelay);
190
191 /**
192 * Cancels the function with the specified name, so it will no longer be run.
193 *
194 * Returns false if no function exists with the specified name.
195 */
196 bool cancelFunction(StringPiece nameID);
197 bool cancelFunctionAndWait(StringPiece nameID);
198
199 /**
200 * All functions registered will be canceled.
201 */
202 void cancelAllFunctions();
203 void cancelAllFunctionsAndWait();
204
205 /**
206 * Resets the specified function's timer.
207 * When resetFunctionTimer is called, the specified function's timer will
208 * be reset with the same parameters it was passed initially, including
209 * its startDelay. If the startDelay was 0, the function will be invoked
210 * immediately.
211 *
212 * Returns false if no function exists with the specified name.
213 */
214 bool resetFunctionTimer(StringPiece nameID);
215
216 /**
217 * Starts the scheduler.
218 *
219 * Returns false if the scheduler was already running.
220 */
221 bool start();
222
223 /**
224 * Stops the FunctionScheduler.
225 *
226 * It may be restarted later by calling start() again.
227 * Returns false if the scheduler was not running.
228 */
229 bool shutdown();
230
231 /**
232 * Set the name of the worker thread.
233 */
234 void setThreadName(StringPiece threadName);
235
236 private:
237 struct RepeatFunc {
238 Function<void()> cb;
239 NextRunTimeFunc nextRunTimeFunc;
240 std::chrono::steady_clock::time_point nextRunTime;
241 std::string name;
242 std::chrono::milliseconds startDelay;
243 std::string intervalDescr;
244 bool runOnce;
245
246 RepeatFunc(
247 Function<void()>&& cback,
248 IntervalDistributionFunc&& intervalFn,
249 const std::string& nameID,
250 const std::string& intervalDistDescription,
251 std::chrono::milliseconds delay,
252 bool once)
253 : RepeatFunc(
254 std::move(cback),
255 getNextRunTimeFunc(std::move(intervalFn)),
256 nameID,
257 intervalDistDescription,
258 delay,
259 once) {}
260
261 RepeatFunc(
262 Function<void()>&& cback,
263 NextRunTimeFunc&& nextRunTimeFn,
264 const std::string& nameID,
265 const std::string& intervalDistDescription,
266 std::chrono::milliseconds delay,
267 bool once)
268 : cb(std::move(cback)),
269 nextRunTimeFunc(std::move(nextRunTimeFn)),
270 nextRunTime(),
271 name(nameID),
272 startDelay(delay),
273 intervalDescr(intervalDistDescription),
274 runOnce(once) {}
275
276 static NextRunTimeFunc getNextRunTimeFunc(
277 IntervalDistributionFunc&& intervalFn) {
278 return [intervalFn = std::move(intervalFn)](
279 std::chrono::steady_clock::time_point /* curNextRunTime */,
280 std::chrono::steady_clock::time_point curTime) mutable {
281 return curTime + intervalFn();
282 };
283 }
284
285 std::chrono::steady_clock::time_point getNextRunTime() const {
286 return nextRunTime;
287 }
288 void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
289 nextRunTime = nextRunTimeFunc(nextRunTime, curTime);
290 }
291 void setNextRunTimeSteady() {
292 nextRunTime = nextRunTimeFunc(nextRunTime, nextRunTime);
293 }
294 void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
295 nextRunTime = curTime + startDelay;
296 }
297 void cancel() {
298 // Simply reset cb to an empty function.
299 cb = {};
300 }
301 bool isValid() const {
302 return bool(cb);
303 }
304 };
305
306 struct RunTimeOrder {
307 bool operator()(
308 const std::unique_ptr<RepeatFunc>& f1,
309 const std::unique_ptr<RepeatFunc>& f2) const {
310 return f1->getNextRunTime() > f2->getNextRunTime();
311 }
312 };
313
314 typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
315 typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
316
317 void run();
318 void runOneFunction(
319 std::unique_lock<std::mutex>& lock,
320 std::chrono::steady_clock::time_point now);
321 void cancelFunction(const std::unique_lock<std::mutex>& lock, RepeatFunc* it);
322 void addFunctionToHeap(
323 const std::unique_lock<std::mutex>& lock,
324 std::unique_ptr<RepeatFunc> func);
325
326 template <typename RepeatFuncNextRunTimeFunc>
327 void addFunctionToHeapChecked(
328 Function<void()>&& cb,
329 RepeatFuncNextRunTimeFunc&& fn,
330 const std::string& nameID,
331 const std::string& intervalDescr,
332 std::chrono::milliseconds startDelay,
333 bool runOnce);
334
335 void addFunctionInternal(
336 Function<void()>&& cb,
337 NextRunTimeFunc&& fn,
338 const std::string& nameID,
339 const std::string& intervalDescr,
340 std::chrono::milliseconds startDelay,
341 bool runOnce);
342 void addFunctionInternal(
343 Function<void()>&& cb,
344 IntervalDistributionFunc&& fn,
345 const std::string& nameID,
346 const std::string& intervalDescr,
347 std::chrono::milliseconds startDelay,
348 bool runOnce);
349
350 // Return true if the current function is being canceled
351 bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
352 bool cancelFunctionWithLock(
353 std::unique_lock<std::mutex>& lock,
354 StringPiece nameID);
355
356 std::thread thread_;
357
358 // Mutex to protect our member variables.
359 std::mutex mutex_;
360 bool running_{false};
361
362 // The functions to run.
363 // This is a heap, ordered by next run time.
364 FunctionHeap functions_;
365 FunctionMap functionsMap_;
366 RunTimeOrder fnCmp_;
367
368 // The function currently being invoked by the running thread.
369 // This is null when the running thread is idle
370 RepeatFunc* currentFunction_{nullptr};
371
372 // Condition variable that is signalled whenever a new function is added
373 // or when the FunctionScheduler is stopped.
374 std::condition_variable runningCondvar_;
375
376 std::string threadName_;
377 bool steady_{false};
378 bool cancellingCurrentFunction_{false};
379};
380
381} // namespace folly
382