1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/experimental/FunctionScheduler.h>
18
19#include <random>
20
21#include <folly/Conv.h>
22#include <folly/Random.h>
23#include <folly/String.h>
24#include <folly/system/ThreadName.h>
25
26using std::chrono::milliseconds;
27using std::chrono::steady_clock;
28
29namespace folly {
30
31namespace {
32
33struct ConsistentDelayFunctor {
34 const milliseconds constInterval;
35
36 explicit ConsistentDelayFunctor(milliseconds interval)
37 : constInterval(interval) {
38 if (interval < milliseconds::zero()) {
39 throw std::invalid_argument(
40 "FunctionScheduler: "
41 "time interval must be non-negative");
42 }
43 }
44
45 steady_clock::time_point operator()(
46 steady_clock::time_point curNextRunTime,
47 steady_clock::time_point curTime) const {
48 auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
49 return (intervalsPassed + 1) * constInterval + curNextRunTime;
50 }
51};
52
53struct ConstIntervalFunctor {
54 const milliseconds constInterval;
55
56 explicit ConstIntervalFunctor(milliseconds interval)
57 : constInterval(interval) {
58 if (interval < milliseconds::zero()) {
59 throw std::invalid_argument(
60 "FunctionScheduler: "
61 "time interval must be non-negative");
62 }
63 }
64
65 milliseconds operator()() const {
66 return constInterval;
67 }
68};
69
70struct PoissonDistributionFunctor {
71 std::default_random_engine generator;
72 std::poisson_distribution<int> poissonRandom;
73
74 explicit PoissonDistributionFunctor(double meanPoissonMs)
75 : poissonRandom(meanPoissonMs) {
76 if (meanPoissonMs < 0.0) {
77 throw std::invalid_argument(
78 "FunctionScheduler: "
79 "Poisson mean interval must be non-negative");
80 }
81 }
82
83 milliseconds operator()() {
84 return milliseconds(poissonRandom(generator));
85 }
86};
87
88struct UniformDistributionFunctor {
89 std::default_random_engine generator;
90 std::uniform_int_distribution<milliseconds::rep> dist;
91
92 UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
93 : generator(Random::rand32()),
94 dist(minInterval.count(), maxInterval.count()) {
95 if (minInterval > maxInterval) {
96 throw std::invalid_argument(
97 "FunctionScheduler: "
98 "min time interval must be less or equal than max interval");
99 }
100 if (minInterval < milliseconds::zero()) {
101 throw std::invalid_argument(
102 "FunctionScheduler: "
103 "time interval must be non-negative");
104 }
105 }
106
107 milliseconds operator()() {
108 return milliseconds(dist(generator));
109 }
110};
111
112} // namespace
113
114FunctionScheduler::FunctionScheduler() {}
115
116FunctionScheduler::~FunctionScheduler() {
117 // make sure to stop the thread (if running)
118 shutdown();
119}
120
121void FunctionScheduler::addFunction(
122 Function<void()>&& cb,
123 milliseconds interval,
124 StringPiece nameID,
125 milliseconds startDelay) {
126 addFunctionInternal(
127 std::move(cb),
128 ConstIntervalFunctor(interval),
129 nameID.str(),
130 to<std::string>(interval.count(), "ms"),
131 startDelay,
132 false /*runOnce*/);
133}
134
135void FunctionScheduler::addFunction(
136 Function<void()>&& cb,
137 milliseconds interval,
138 const LatencyDistribution& latencyDistr,
139 StringPiece nameID,
140 milliseconds startDelay) {
141 if (latencyDistr.isPoisson) {
142 addFunctionInternal(
143 std::move(cb),
144 PoissonDistributionFunctor(latencyDistr.poissonMean),
145 nameID.str(),
146 to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
147 startDelay,
148 false /*runOnce*/);
149 } else {
150 addFunction(std::move(cb), interval, nameID, startDelay);
151 }
152}
153
154void FunctionScheduler::addFunctionOnce(
155 Function<void()>&& cb,
156 StringPiece nameID,
157 milliseconds startDelay) {
158 addFunctionInternal(
159 std::move(cb),
160 ConstIntervalFunctor(milliseconds::zero()),
161 nameID.str(),
162 "once",
163 startDelay,
164 true /*runOnce*/);
165}
166
167void FunctionScheduler::addFunctionUniformDistribution(
168 Function<void()>&& cb,
169 milliseconds minInterval,
170 milliseconds maxInterval,
171 StringPiece nameID,
172 milliseconds startDelay) {
173 addFunctionInternal(
174 std::move(cb),
175 UniformDistributionFunctor(minInterval, maxInterval),
176 nameID.str(),
177 to<std::string>(
178 "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
179 startDelay,
180 false /*runOnce*/);
181}
182
183void FunctionScheduler::addFunctionConsistentDelay(
184 Function<void()>&& cb,
185 milliseconds interval,
186 StringPiece nameID,
187 milliseconds startDelay) {
188 addFunctionInternal(
189 std::move(cb),
190 ConsistentDelayFunctor(interval),
191 nameID.str(),
192 to<std::string>(interval.count(), "ms"),
193 startDelay,
194 false /*runOnce*/);
195}
196
197void FunctionScheduler::addFunctionGenericDistribution(
198 Function<void()>&& cb,
199 IntervalDistributionFunc&& intervalFunc,
200 const std::string& nameID,
201 const std::string& intervalDescr,
202 milliseconds startDelay) {
203 addFunctionInternal(
204 std::move(cb),
205 std::move(intervalFunc),
206 nameID,
207 intervalDescr,
208 startDelay,
209 false /*runOnce*/);
210}
211
212void FunctionScheduler::addFunctionGenericNextRunTimeFunctor(
213 Function<void()>&& cb,
214 NextRunTimeFunc&& fn,
215 const std::string& nameID,
216 const std::string& intervalDescr,
217 milliseconds startDelay) {
218 addFunctionInternal(
219 std::move(cb),
220 std::move(fn),
221 nameID,
222 intervalDescr,
223 startDelay,
224 false /*runOnce*/);
225}
226
227template <typename RepeatFuncNextRunTimeFunc>
228void FunctionScheduler::addFunctionToHeapChecked(
229 Function<void()>&& cb,
230 RepeatFuncNextRunTimeFunc&& fn,
231 const std::string& nameID,
232 const std::string& intervalDescr,
233 milliseconds startDelay,
234 bool runOnce) {
235 if (!cb) {
236 throw std::invalid_argument(
237 "FunctionScheduler: Scheduled function must be set");
238 }
239 if (!fn) {
240 throw std::invalid_argument(
241 "FunctionScheduler: "
242 "interval distribution or next run time function must be set");
243 }
244 if (startDelay < milliseconds::zero()) {
245 throw std::invalid_argument(
246 "FunctionScheduler: start delay must be non-negative");
247 }
248
249 std::unique_lock<std::mutex> l(mutex_);
250 auto it = functionsMap_.find(nameID);
251 // check if the nameID is unique
252 if (it != functionsMap_.end() && it->second->isValid()) {
253 throw std::invalid_argument(to<std::string>(
254 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
255 }
256
257 if (currentFunction_ && currentFunction_->name == nameID) {
258 throw std::invalid_argument(to<std::string>(
259 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
260 }
261
262 addFunctionToHeap(
263 l,
264 std::make_unique<RepeatFunc>(
265 std::move(cb),
266 std::forward<RepeatFuncNextRunTimeFunc>(fn),
267 nameID,
268 intervalDescr,
269 startDelay,
270 runOnce));
271}
272
273void FunctionScheduler::addFunctionInternal(
274 Function<void()>&& cb,
275 NextRunTimeFunc&& fn,
276 const std::string& nameID,
277 const std::string& intervalDescr,
278 milliseconds startDelay,
279 bool runOnce) {
280 return addFunctionToHeapChecked(
281 std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
282}
283
284void FunctionScheduler::addFunctionInternal(
285 Function<void()>&& cb,
286 IntervalDistributionFunc&& fn,
287 const std::string& nameID,
288 const std::string& intervalDescr,
289 milliseconds startDelay,
290 bool runOnce) {
291 return addFunctionToHeapChecked(
292 std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
293}
294
295bool FunctionScheduler::cancelFunctionWithLock(
296 std::unique_lock<std::mutex>& lock,
297 StringPiece nameID) {
298 CHECK_EQ(lock.owns_lock(), true);
299 if (currentFunction_ && currentFunction_->name == nameID) {
300 functionsMap_.erase(currentFunction_->name);
301 // This function is currently being run. Clear currentFunction_
302 // The running thread will see this and won't reschedule the function.
303 currentFunction_ = nullptr;
304 cancellingCurrentFunction_ = true;
305 return true;
306 }
307 return false;
308}
309
310bool FunctionScheduler::cancelFunction(StringPiece nameID) {
311 std::unique_lock<std::mutex> l(mutex_);
312 if (cancelFunctionWithLock(l, nameID)) {
313 return true;
314 }
315 auto it = functionsMap_.find(nameID);
316 if (it != functionsMap_.end() && it->second->isValid()) {
317 cancelFunction(l, it->second);
318 return true;
319 }
320
321 return false;
322}
323
324bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
325 std::unique_lock<std::mutex> l(mutex_);
326
327 if (cancelFunctionWithLock(l, nameID)) {
328 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
329 return true;
330 }
331
332 auto it = functionsMap_.find(nameID);
333 if (it != functionsMap_.end() && it->second->isValid()) {
334 cancelFunction(l, it->second);
335 return true;
336 }
337 return false;
338}
339
340void FunctionScheduler::cancelFunction(
341 const std::unique_lock<std::mutex>& l,
342 RepeatFunc* it) {
343 // This function should only be called with mutex_ already locked.
344 DCHECK(l.mutex() == &mutex_);
345 DCHECK(l.owns_lock());
346 functionsMap_.erase(it->name);
347 it->cancel();
348}
349
350bool FunctionScheduler::cancelAllFunctionsWithLock(
351 std::unique_lock<std::mutex>& lock) {
352 CHECK_EQ(lock.owns_lock(), true);
353 functions_.clear();
354 functionsMap_.clear();
355 if (currentFunction_) {
356 cancellingCurrentFunction_ = true;
357 }
358 currentFunction_ = nullptr;
359 return cancellingCurrentFunction_;
360}
361
362void FunctionScheduler::cancelAllFunctions() {
363 std::unique_lock<std::mutex> l(mutex_);
364 cancelAllFunctionsWithLock(l);
365}
366
367void FunctionScheduler::cancelAllFunctionsAndWait() {
368 std::unique_lock<std::mutex> l(mutex_);
369 if (cancelAllFunctionsWithLock(l)) {
370 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
371 }
372}
373
374bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
375 std::unique_lock<std::mutex> l(mutex_);
376 if (currentFunction_ && currentFunction_->name == nameID) {
377 if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
378 return false;
379 }
380 currentFunction_->resetNextRunTime(steady_clock::now());
381 return true;
382 }
383
384 // Since __adjust_heap() isn't a part of the standard API, there's no way to
385 // fix the heap ordering if we adjust the key (nextRunTime) for the existing
386 // RepeatFunc. Instead, we just cancel it and add an identical object.
387 auto it = functionsMap_.find(nameID);
388 if (it != functionsMap_.end() && it->second->isValid()) {
389 if (running_) {
390 it->second->resetNextRunTime(steady_clock::now());
391 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
392 runningCondvar_.notify_one();
393 }
394 return true;
395 }
396 return false;
397}
398
399bool FunctionScheduler::start() {
400 std::unique_lock<std::mutex> l(mutex_);
401 if (running_) {
402 return false;
403 }
404
405 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
406 << " functions.";
407 auto now = steady_clock::now();
408 // Reset the next run time. for all functions.
409 // note: this is needed since one can shutdown() and start() again
410 for (const auto& f : functions_) {
411 f->resetNextRunTime(now);
412 VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
413 << ", period = " << f->intervalDescr
414 << ", delay = " << f->startDelay.count() << "ms";
415 }
416 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
417
418 thread_ = std::thread([&] { this->run(); });
419 running_ = true;
420
421 return true;
422}
423
424bool FunctionScheduler::shutdown() {
425 {
426 std::lock_guard<std::mutex> g(mutex_);
427 if (!running_) {
428 return false;
429 }
430
431 running_ = false;
432 runningCondvar_.notify_one();
433 }
434 thread_.join();
435 return true;
436}
437
438void FunctionScheduler::run() {
439 std::unique_lock<std::mutex> lock(mutex_);
440
441 if (!threadName_.empty()) {
442 folly::setThreadName(threadName_);
443 }
444
445 while (running_) {
446 // If we have nothing to run, wait until a function is added or until we
447 // are stopped.
448 if (functions_.empty()) {
449 runningCondvar_.wait(lock);
450 continue;
451 }
452
453 auto now = steady_clock::now();
454
455 // Move the next function to run to the end of functions_
456 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
457
458 // Check to see if the function was cancelled.
459 // If so, just remove it and continue around the loop.
460 if (!functions_.back()->isValid()) {
461 functions_.pop_back();
462 continue;
463 }
464
465 auto sleepTime = functions_.back()->getNextRunTime() - now;
466 if (sleepTime < milliseconds::zero()) {
467 // We need to run this function now
468 runOneFunction(lock, now);
469 runningCondvar_.notify_all();
470 } else {
471 // Re-add the function to the heap, and wait until we actually
472 // need to run it.
473 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
474 runningCondvar_.wait_for(lock, sleepTime);
475 }
476 }
477}
478
479void FunctionScheduler::runOneFunction(
480 std::unique_lock<std::mutex>& lock,
481 steady_clock::time_point now) {
482 DCHECK(lock.mutex() == &mutex_);
483 DCHECK(lock.owns_lock());
484
485 // The function to run will be at the end of functions_ already.
486 //
487 // Fully remove it from functions_ now.
488 // We need to release mutex_ while we invoke this function, and we need to
489 // maintain the heap property on functions_ while mutex_ is unlocked.
490 auto func = std::move(functions_.back());
491 functions_.pop_back();
492 if (!func->cb) {
493 VLOG(5) << func->name << "function has been canceled while waiting";
494 return;
495 }
496 currentFunction_ = func.get();
497 // Update the function's next run time.
498 if (steady_) {
499 // This allows scheduler to catch up
500 func->setNextRunTimeSteady();
501 } else {
502 // Note that we set nextRunTime based on the current time where we started
503 // the function call, rather than the time when the function finishes.
504 // This ensures that we call the function once every time interval, as
505 // opposed to waiting time interval seconds between calls. (These can be
506 // different if the function takes a significant amount of time to run.)
507 func->setNextRunTimeStrict(now);
508 }
509
510 // Release the lock while we invoke the user's function
511 lock.unlock();
512
513 // Invoke the function
514 try {
515 VLOG(5) << "Now running " << func->name;
516 func->cb();
517 } catch (const std::exception& ex) {
518 LOG(ERROR) << "Error running the scheduled function <" << func->name
519 << ">: " << exceptionStr(ex);
520 }
521
522 // Re-acquire the lock
523 lock.lock();
524
525 if (!currentFunction_) {
526 // The function was cancelled while we were running it.
527 // We shouldn't reschedule it;
528 cancellingCurrentFunction_ = false;
529 return;
530 }
531 if (currentFunction_->runOnce) {
532 // Don't reschedule if the function only needed to run once.
533 functionsMap_.erase(currentFunction_->name);
534 currentFunction_ = nullptr;
535 return;
536 }
537
538 // Re-insert the function into our functions_ heap.
539 // We only maintain the heap property while running_ is set. (running_ may
540 // have been cleared while we were invoking the user's function.)
541 functions_.push_back(std::move(func));
542
543 // Clear currentFunction_
544 currentFunction_ = nullptr;
545
546 if (running_) {
547 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
548 }
549}
550
551void FunctionScheduler::addFunctionToHeap(
552 const std::unique_lock<std::mutex>& lock,
553 std::unique_ptr<RepeatFunc> func) {
554 // This function should only be called with mutex_ already locked.
555 DCHECK(lock.mutex() == &mutex_);
556 DCHECK(lock.owns_lock());
557
558 functions_.push_back(std::move(func));
559 functionsMap_[functions_.back()->name] = functions_.back().get();
560 if (running_) {
561 functions_.back()->resetNextRunTime(steady_clock::now());
562 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
563 // Signal the running thread to wake up and see if it needs to change
564 // its current scheduling decision.
565 runningCondvar_.notify_one();
566 }
567}
568
569void FunctionScheduler::setThreadName(StringPiece threadName) {
570 std::unique_lock<std::mutex> l(mutex_);
571 threadName_ = threadName.str();
572}
573
574} // namespace folly
575