1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/executors/ManualExecutor.h>
18
19#include <string.h>
20#include <string>
21#include <tuple>
22
23namespace folly {
24
25ManualExecutor::~ManualExecutor() {
26 drain();
27}
28
29void ManualExecutor::add(Func callback) {
30 std::lock_guard<std::mutex> lock(lock_);
31 funcs_.emplace(std::move(callback));
32 sem_.post();
33}
34
35size_t ManualExecutor::run() {
36 size_t count;
37 size_t n;
38 Func func;
39
40 {
41 std::lock_guard<std::mutex> lock(lock_);
42
43 while (!scheduledFuncs_.empty()) {
44 auto& sf = scheduledFuncs_.top();
45 if (sf.time > now_) {
46 break;
47 }
48 funcs_.emplace(sf.moveOutFunc());
49 scheduledFuncs_.pop();
50 }
51
52 n = funcs_.size();
53 }
54
55 for (count = 0; count < n; count++) {
56 {
57 std::lock_guard<std::mutex> lock(lock_);
58 if (funcs_.empty()) {
59 break;
60 }
61
62 // Balance the semaphore so it doesn't grow without bound
63 // if nobody is calling wait().
64 // This may fail (with EAGAIN), that's fine.
65 sem_.tryWait();
66
67 func = std::move(funcs_.front());
68 funcs_.pop();
69 }
70 func();
71 }
72
73 return count;
74}
75
76size_t ManualExecutor::drain() {
77 size_t tasksRun = 0;
78 size_t tasksForSingleRun = 0;
79 while ((tasksForSingleRun = run()) != 0) {
80 tasksRun += tasksForSingleRun;
81 }
82 return tasksRun;
83}
84
85void ManualExecutor::wait() {
86 while (true) {
87 {
88 std::lock_guard<std::mutex> lock(lock_);
89 if (!funcs_.empty()) {
90 break;
91 }
92 }
93
94 sem_.wait();
95 }
96}
97
98void ManualExecutor::advanceTo(TimePoint const& t) {
99 if (t > now_) {
100 now_ = t;
101 }
102 run();
103}
104
105} // namespace folly
106