1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/experimental/FunctionScheduler.h> |
18 | |
19 | #include <random> |
20 | |
21 | #include <folly/Conv.h> |
22 | #include <folly/Random.h> |
23 | #include <folly/String.h> |
24 | #include <folly/system/ThreadName.h> |
25 | |
26 | using std::chrono::milliseconds; |
27 | using std::chrono::steady_clock; |
28 | |
29 | namespace folly { |
30 | |
31 | namespace { |
32 | |
33 | struct ConsistentDelayFunctor { |
34 | const milliseconds constInterval; |
35 | |
36 | explicit ConsistentDelayFunctor(milliseconds interval) |
37 | : constInterval(interval) { |
38 | if (interval < milliseconds::zero()) { |
39 | throw std::invalid_argument( |
40 | "FunctionScheduler: " |
41 | "time interval must be non-negative" ); |
42 | } |
43 | } |
44 | |
45 | steady_clock::time_point operator()( |
46 | steady_clock::time_point curNextRunTime, |
47 | steady_clock::time_point curTime) const { |
48 | auto intervalsPassed = (curTime - curNextRunTime) / constInterval; |
49 | return (intervalsPassed + 1) * constInterval + curNextRunTime; |
50 | } |
51 | }; |
52 | |
53 | struct ConstIntervalFunctor { |
54 | const milliseconds constInterval; |
55 | |
56 | explicit ConstIntervalFunctor(milliseconds interval) |
57 | : constInterval(interval) { |
58 | if (interval < milliseconds::zero()) { |
59 | throw std::invalid_argument( |
60 | "FunctionScheduler: " |
61 | "time interval must be non-negative" ); |
62 | } |
63 | } |
64 | |
65 | milliseconds operator()() const { |
66 | return constInterval; |
67 | } |
68 | }; |
69 | |
70 | struct PoissonDistributionFunctor { |
71 | std::default_random_engine generator; |
72 | std::poisson_distribution<int> poissonRandom; |
73 | |
74 | explicit PoissonDistributionFunctor(double meanPoissonMs) |
75 | : poissonRandom(meanPoissonMs) { |
76 | if (meanPoissonMs < 0.0) { |
77 | throw std::invalid_argument( |
78 | "FunctionScheduler: " |
79 | "Poisson mean interval must be non-negative" ); |
80 | } |
81 | } |
82 | |
83 | milliseconds operator()() { |
84 | return milliseconds(poissonRandom(generator)); |
85 | } |
86 | }; |
87 | |
88 | struct UniformDistributionFunctor { |
89 | std::default_random_engine generator; |
90 | std::uniform_int_distribution<milliseconds::rep> dist; |
91 | |
92 | UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval) |
93 | : generator(Random::rand32()), |
94 | dist(minInterval.count(), maxInterval.count()) { |
95 | if (minInterval > maxInterval) { |
96 | throw std::invalid_argument( |
97 | "FunctionScheduler: " |
98 | "min time interval must be less or equal than max interval" ); |
99 | } |
100 | if (minInterval < milliseconds::zero()) { |
101 | throw std::invalid_argument( |
102 | "FunctionScheduler: " |
103 | "time interval must be non-negative" ); |
104 | } |
105 | } |
106 | |
107 | milliseconds operator()() { |
108 | return milliseconds(dist(generator)); |
109 | } |
110 | }; |
111 | |
112 | } // namespace |
113 | |
114 | FunctionScheduler::FunctionScheduler() {} |
115 | |
116 | FunctionScheduler::~FunctionScheduler() { |
117 | // make sure to stop the thread (if running) |
118 | shutdown(); |
119 | } |
120 | |
121 | void FunctionScheduler::addFunction( |
122 | Function<void()>&& cb, |
123 | milliseconds interval, |
124 | StringPiece nameID, |
125 | milliseconds startDelay) { |
126 | addFunctionInternal( |
127 | std::move(cb), |
128 | ConstIntervalFunctor(interval), |
129 | nameID.str(), |
130 | to<std::string>(interval.count(), "ms" ), |
131 | startDelay, |
132 | false /*runOnce*/); |
133 | } |
134 | |
135 | void FunctionScheduler::addFunction( |
136 | Function<void()>&& cb, |
137 | milliseconds interval, |
138 | const LatencyDistribution& latencyDistr, |
139 | StringPiece nameID, |
140 | milliseconds startDelay) { |
141 | if (latencyDistr.isPoisson) { |
142 | addFunctionInternal( |
143 | std::move(cb), |
144 | PoissonDistributionFunctor(latencyDistr.poissonMean), |
145 | nameID.str(), |
146 | to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)" ), |
147 | startDelay, |
148 | false /*runOnce*/); |
149 | } else { |
150 | addFunction(std::move(cb), interval, nameID, startDelay); |
151 | } |
152 | } |
153 | |
154 | void FunctionScheduler::addFunctionOnce( |
155 | Function<void()>&& cb, |
156 | StringPiece nameID, |
157 | milliseconds startDelay) { |
158 | addFunctionInternal( |
159 | std::move(cb), |
160 | ConstIntervalFunctor(milliseconds::zero()), |
161 | nameID.str(), |
162 | "once" , |
163 | startDelay, |
164 | true /*runOnce*/); |
165 | } |
166 | |
167 | void FunctionScheduler::addFunctionUniformDistribution( |
168 | Function<void()>&& cb, |
169 | milliseconds minInterval, |
170 | milliseconds maxInterval, |
171 | StringPiece nameID, |
172 | milliseconds startDelay) { |
173 | addFunctionInternal( |
174 | std::move(cb), |
175 | UniformDistributionFunctor(minInterval, maxInterval), |
176 | nameID.str(), |
177 | to<std::string>( |
178 | "[" , minInterval.count(), " , " , maxInterval.count(), "] ms" ), |
179 | startDelay, |
180 | false /*runOnce*/); |
181 | } |
182 | |
183 | void FunctionScheduler::addFunctionConsistentDelay( |
184 | Function<void()>&& cb, |
185 | milliseconds interval, |
186 | StringPiece nameID, |
187 | milliseconds startDelay) { |
188 | addFunctionInternal( |
189 | std::move(cb), |
190 | ConsistentDelayFunctor(interval), |
191 | nameID.str(), |
192 | to<std::string>(interval.count(), "ms" ), |
193 | startDelay, |
194 | false /*runOnce*/); |
195 | } |
196 | |
197 | void FunctionScheduler::addFunctionGenericDistribution( |
198 | Function<void()>&& cb, |
199 | IntervalDistributionFunc&& intervalFunc, |
200 | const std::string& nameID, |
201 | const std::string& intervalDescr, |
202 | milliseconds startDelay) { |
203 | addFunctionInternal( |
204 | std::move(cb), |
205 | std::move(intervalFunc), |
206 | nameID, |
207 | intervalDescr, |
208 | startDelay, |
209 | false /*runOnce*/); |
210 | } |
211 | |
212 | void FunctionScheduler::addFunctionGenericNextRunTimeFunctor( |
213 | Function<void()>&& cb, |
214 | NextRunTimeFunc&& fn, |
215 | const std::string& nameID, |
216 | const std::string& intervalDescr, |
217 | milliseconds startDelay) { |
218 | addFunctionInternal( |
219 | std::move(cb), |
220 | std::move(fn), |
221 | nameID, |
222 | intervalDescr, |
223 | startDelay, |
224 | false /*runOnce*/); |
225 | } |
226 | |
227 | template <typename RepeatFuncNextRunTimeFunc> |
228 | void FunctionScheduler::addFunctionToHeapChecked( |
229 | Function<void()>&& cb, |
230 | RepeatFuncNextRunTimeFunc&& fn, |
231 | const std::string& nameID, |
232 | const std::string& intervalDescr, |
233 | milliseconds startDelay, |
234 | bool runOnce) { |
235 | if (!cb) { |
236 | throw std::invalid_argument( |
237 | "FunctionScheduler: Scheduled function must be set" ); |
238 | } |
239 | if (!fn) { |
240 | throw std::invalid_argument( |
241 | "FunctionScheduler: " |
242 | "interval distribution or next run time function must be set" ); |
243 | } |
244 | if (startDelay < milliseconds::zero()) { |
245 | throw std::invalid_argument( |
246 | "FunctionScheduler: start delay must be non-negative" ); |
247 | } |
248 | |
249 | std::unique_lock<std::mutex> l(mutex_); |
250 | auto it = functionsMap_.find(nameID); |
251 | // check if the nameID is unique |
252 | if (it != functionsMap_.end() && it->second->isValid()) { |
253 | throw std::invalid_argument(to<std::string>( |
254 | "FunctionScheduler: a function named \"" , nameID, "\" already exists" )); |
255 | } |
256 | |
257 | if (currentFunction_ && currentFunction_->name == nameID) { |
258 | throw std::invalid_argument(to<std::string>( |
259 | "FunctionScheduler: a function named \"" , nameID, "\" already exists" )); |
260 | } |
261 | |
262 | addFunctionToHeap( |
263 | l, |
264 | std::make_unique<RepeatFunc>( |
265 | std::move(cb), |
266 | std::forward<RepeatFuncNextRunTimeFunc>(fn), |
267 | nameID, |
268 | intervalDescr, |
269 | startDelay, |
270 | runOnce)); |
271 | } |
272 | |
273 | void FunctionScheduler::addFunctionInternal( |
274 | Function<void()>&& cb, |
275 | NextRunTimeFunc&& fn, |
276 | const std::string& nameID, |
277 | const std::string& intervalDescr, |
278 | milliseconds startDelay, |
279 | bool runOnce) { |
280 | return addFunctionToHeapChecked( |
281 | std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce); |
282 | } |
283 | |
284 | void FunctionScheduler::addFunctionInternal( |
285 | Function<void()>&& cb, |
286 | IntervalDistributionFunc&& fn, |
287 | const std::string& nameID, |
288 | const std::string& intervalDescr, |
289 | milliseconds startDelay, |
290 | bool runOnce) { |
291 | return addFunctionToHeapChecked( |
292 | std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce); |
293 | } |
294 | |
295 | bool FunctionScheduler::cancelFunctionWithLock( |
296 | std::unique_lock<std::mutex>& lock, |
297 | StringPiece nameID) { |
298 | CHECK_EQ(lock.owns_lock(), true); |
299 | if (currentFunction_ && currentFunction_->name == nameID) { |
300 | functionsMap_.erase(currentFunction_->name); |
301 | // This function is currently being run. Clear currentFunction_ |
302 | // The running thread will see this and won't reschedule the function. |
303 | currentFunction_ = nullptr; |
304 | cancellingCurrentFunction_ = true; |
305 | return true; |
306 | } |
307 | return false; |
308 | } |
309 | |
310 | bool FunctionScheduler::cancelFunction(StringPiece nameID) { |
311 | std::unique_lock<std::mutex> l(mutex_); |
312 | if (cancelFunctionWithLock(l, nameID)) { |
313 | return true; |
314 | } |
315 | auto it = functionsMap_.find(nameID); |
316 | if (it != functionsMap_.end() && it->second->isValid()) { |
317 | cancelFunction(l, it->second); |
318 | return true; |
319 | } |
320 | |
321 | return false; |
322 | } |
323 | |
324 | bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) { |
325 | std::unique_lock<std::mutex> l(mutex_); |
326 | |
327 | if (cancelFunctionWithLock(l, nameID)) { |
328 | runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; }); |
329 | return true; |
330 | } |
331 | |
332 | auto it = functionsMap_.find(nameID); |
333 | if (it != functionsMap_.end() && it->second->isValid()) { |
334 | cancelFunction(l, it->second); |
335 | return true; |
336 | } |
337 | return false; |
338 | } |
339 | |
340 | void FunctionScheduler::cancelFunction( |
341 | const std::unique_lock<std::mutex>& l, |
342 | RepeatFunc* it) { |
343 | // This function should only be called with mutex_ already locked. |
344 | DCHECK(l.mutex() == &mutex_); |
345 | DCHECK(l.owns_lock()); |
346 | functionsMap_.erase(it->name); |
347 | it->cancel(); |
348 | } |
349 | |
350 | bool FunctionScheduler::cancelAllFunctionsWithLock( |
351 | std::unique_lock<std::mutex>& lock) { |
352 | CHECK_EQ(lock.owns_lock(), true); |
353 | functions_.clear(); |
354 | functionsMap_.clear(); |
355 | if (currentFunction_) { |
356 | cancellingCurrentFunction_ = true; |
357 | } |
358 | currentFunction_ = nullptr; |
359 | return cancellingCurrentFunction_; |
360 | } |
361 | |
362 | void FunctionScheduler::cancelAllFunctions() { |
363 | std::unique_lock<std::mutex> l(mutex_); |
364 | cancelAllFunctionsWithLock(l); |
365 | } |
366 | |
367 | void FunctionScheduler::cancelAllFunctionsAndWait() { |
368 | std::unique_lock<std::mutex> l(mutex_); |
369 | if (cancelAllFunctionsWithLock(l)) { |
370 | runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; }); |
371 | } |
372 | } |
373 | |
374 | bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { |
375 | std::unique_lock<std::mutex> l(mutex_); |
376 | if (currentFunction_ && currentFunction_->name == nameID) { |
377 | if (cancellingCurrentFunction_ || currentFunction_->runOnce) { |
378 | return false; |
379 | } |
380 | currentFunction_->resetNextRunTime(steady_clock::now()); |
381 | return true; |
382 | } |
383 | |
384 | // Since __adjust_heap() isn't a part of the standard API, there's no way to |
385 | // fix the heap ordering if we adjust the key (nextRunTime) for the existing |
386 | // RepeatFunc. Instead, we just cancel it and add an identical object. |
387 | auto it = functionsMap_.find(nameID); |
388 | if (it != functionsMap_.end() && it->second->isValid()) { |
389 | if (running_) { |
390 | it->second->resetNextRunTime(steady_clock::now()); |
391 | std::make_heap(functions_.begin(), functions_.end(), fnCmp_); |
392 | runningCondvar_.notify_one(); |
393 | } |
394 | return true; |
395 | } |
396 | return false; |
397 | } |
398 | |
399 | bool FunctionScheduler::start() { |
400 | std::unique_lock<std::mutex> l(mutex_); |
401 | if (running_) { |
402 | return false; |
403 | } |
404 | |
405 | VLOG(1) << "Starting FunctionScheduler with " << functions_.size() |
406 | << " functions." ; |
407 | auto now = steady_clock::now(); |
408 | // Reset the next run time. for all functions. |
409 | // note: this is needed since one can shutdown() and start() again |
410 | for (const auto& f : functions_) { |
411 | f->resetNextRunTime(now); |
412 | VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str()) |
413 | << ", period = " << f->intervalDescr |
414 | << ", delay = " << f->startDelay.count() << "ms" ; |
415 | } |
416 | std::make_heap(functions_.begin(), functions_.end(), fnCmp_); |
417 | |
418 | thread_ = std::thread([&] { this->run(); }); |
419 | running_ = true; |
420 | |
421 | return true; |
422 | } |
423 | |
424 | bool FunctionScheduler::shutdown() { |
425 | { |
426 | std::lock_guard<std::mutex> g(mutex_); |
427 | if (!running_) { |
428 | return false; |
429 | } |
430 | |
431 | running_ = false; |
432 | runningCondvar_.notify_one(); |
433 | } |
434 | thread_.join(); |
435 | return true; |
436 | } |
437 | |
438 | void FunctionScheduler::run() { |
439 | std::unique_lock<std::mutex> lock(mutex_); |
440 | |
441 | if (!threadName_.empty()) { |
442 | folly::setThreadName(threadName_); |
443 | } |
444 | |
445 | while (running_) { |
446 | // If we have nothing to run, wait until a function is added or until we |
447 | // are stopped. |
448 | if (functions_.empty()) { |
449 | runningCondvar_.wait(lock); |
450 | continue; |
451 | } |
452 | |
453 | auto now = steady_clock::now(); |
454 | |
455 | // Move the next function to run to the end of functions_ |
456 | std::pop_heap(functions_.begin(), functions_.end(), fnCmp_); |
457 | |
458 | // Check to see if the function was cancelled. |
459 | // If so, just remove it and continue around the loop. |
460 | if (!functions_.back()->isValid()) { |
461 | functions_.pop_back(); |
462 | continue; |
463 | } |
464 | |
465 | auto sleepTime = functions_.back()->getNextRunTime() - now; |
466 | if (sleepTime < milliseconds::zero()) { |
467 | // We need to run this function now |
468 | runOneFunction(lock, now); |
469 | runningCondvar_.notify_all(); |
470 | } else { |
471 | // Re-add the function to the heap, and wait until we actually |
472 | // need to run it. |
473 | std::push_heap(functions_.begin(), functions_.end(), fnCmp_); |
474 | runningCondvar_.wait_for(lock, sleepTime); |
475 | } |
476 | } |
477 | } |
478 | |
479 | void FunctionScheduler::runOneFunction( |
480 | std::unique_lock<std::mutex>& lock, |
481 | steady_clock::time_point now) { |
482 | DCHECK(lock.mutex() == &mutex_); |
483 | DCHECK(lock.owns_lock()); |
484 | |
485 | // The function to run will be at the end of functions_ already. |
486 | // |
487 | // Fully remove it from functions_ now. |
488 | // We need to release mutex_ while we invoke this function, and we need to |
489 | // maintain the heap property on functions_ while mutex_ is unlocked. |
490 | auto func = std::move(functions_.back()); |
491 | functions_.pop_back(); |
492 | if (!func->cb) { |
493 | VLOG(5) << func->name << "function has been canceled while waiting" ; |
494 | return; |
495 | } |
496 | currentFunction_ = func.get(); |
497 | // Update the function's next run time. |
498 | if (steady_) { |
499 | // This allows scheduler to catch up |
500 | func->setNextRunTimeSteady(); |
501 | } else { |
502 | // Note that we set nextRunTime based on the current time where we started |
503 | // the function call, rather than the time when the function finishes. |
504 | // This ensures that we call the function once every time interval, as |
505 | // opposed to waiting time interval seconds between calls. (These can be |
506 | // different if the function takes a significant amount of time to run.) |
507 | func->setNextRunTimeStrict(now); |
508 | } |
509 | |
510 | // Release the lock while we invoke the user's function |
511 | lock.unlock(); |
512 | |
513 | // Invoke the function |
514 | try { |
515 | VLOG(5) << "Now running " << func->name; |
516 | func->cb(); |
517 | } catch (const std::exception& ex) { |
518 | LOG(ERROR) << "Error running the scheduled function <" << func->name |
519 | << ">: " << exceptionStr(ex); |
520 | } |
521 | |
522 | // Re-acquire the lock |
523 | lock.lock(); |
524 | |
525 | if (!currentFunction_) { |
526 | // The function was cancelled while we were running it. |
527 | // We shouldn't reschedule it; |
528 | cancellingCurrentFunction_ = false; |
529 | return; |
530 | } |
531 | if (currentFunction_->runOnce) { |
532 | // Don't reschedule if the function only needed to run once. |
533 | functionsMap_.erase(currentFunction_->name); |
534 | currentFunction_ = nullptr; |
535 | return; |
536 | } |
537 | |
538 | // Re-insert the function into our functions_ heap. |
539 | // We only maintain the heap property while running_ is set. (running_ may |
540 | // have been cleared while we were invoking the user's function.) |
541 | functions_.push_back(std::move(func)); |
542 | |
543 | // Clear currentFunction_ |
544 | currentFunction_ = nullptr; |
545 | |
546 | if (running_) { |
547 | std::push_heap(functions_.begin(), functions_.end(), fnCmp_); |
548 | } |
549 | } |
550 | |
551 | void FunctionScheduler::addFunctionToHeap( |
552 | const std::unique_lock<std::mutex>& lock, |
553 | std::unique_ptr<RepeatFunc> func) { |
554 | // This function should only be called with mutex_ already locked. |
555 | DCHECK(lock.mutex() == &mutex_); |
556 | DCHECK(lock.owns_lock()); |
557 | |
558 | functions_.push_back(std::move(func)); |
559 | functionsMap_[functions_.back()->name] = functions_.back().get(); |
560 | if (running_) { |
561 | functions_.back()->resetNextRunTime(steady_clock::now()); |
562 | std::push_heap(functions_.begin(), functions_.end(), fnCmp_); |
563 | // Signal the running thread to wake up and see if it needs to change |
564 | // its current scheduling decision. |
565 | runningCondvar_.notify_one(); |
566 | } |
567 | } |
568 | |
569 | void FunctionScheduler::setThreadName(StringPiece threadName) { |
570 | std::unique_lock<std::mutex> l(mutex_); |
571 | threadName_ = threadName.str(); |
572 | } |
573 | |
574 | } // namespace folly |
575 | |