1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <cstdio>
20#include <memory>
21#include <mutex>
22#include <queue>
23
24#include <folly/executors/DrivableExecutor.h>
25#include <folly/executors/ScheduledExecutor.h>
26#include <folly/executors/SequencedExecutor.h>
27#include <folly/synchronization/LifoSem.h>
28
29namespace folly {
30/// A ManualExecutor only does work when you turn the crank, by calling
31/// run() or indirectly with makeProgress() or waitFor().
32///
33/// The clock for a manual executor starts at 0 and advances only when you
34/// ask it to. i.e. time is also under manual control.
35///
36/// NB No attempt has been made to make anything other than add and schedule
37/// threadsafe.
38class ManualExecutor : public DrivableExecutor,
39 public ScheduledExecutor,
40 public SequencedExecutor {
41 public:
42 ~ManualExecutor();
43
44 void add(Func) override;
45
46 /// Do work. Returns the number of functions that were executed (maybe 0).
47 /// Non-blocking, in the sense that we don't wait for work (we can't
48 /// control whether one of the functions blocks).
49 /// This is stable, it will not chase an ever-increasing tail of work.
50 /// This also means, there may be more work available to perform at the
51 /// moment that this returns.
52 size_t run();
53
54 // Do work until there is no more work to do.
55 // Returns the number of functions that were executed (maybe 0).
56 // Unlike run, this method is not stable. It will chase an infinite tail of
57 // work so should be used with care.
58 // There will be no work available to perform at the moment that this
59 // returns.
60 size_t drain();
61
62 /// Wait for work to do.
63 void wait();
64
65 /// Wait for work to do, and do it.
66 void makeProgress() {
67 wait();
68 run();
69 }
70
71 /// Implements DrivableExecutor
72 void drive() override {
73 makeProgress();
74 }
75
76 /// makeProgress until this Future is ready.
77 template <class F>
78 void waitFor(F const& f) {
79 // TODO(5427828)
80#if 0
81 while (!f.isReady())
82 makeProgress();
83#else
84 while (!f.isReady()) {
85 run();
86 }
87#endif
88 }
89
90 void scheduleAt(Func&& f, TimePoint const& t) override {
91 std::lock_guard<std::mutex> lock(lock_);
92 scheduledFuncs_.emplace(t, std::move(f));
93 sem_.post();
94 }
95
96 /// Advance the clock. The clock never advances on its own.
97 /// Advancing the clock causes some work to be done, if work is available
98 /// to do (perhaps newly available because of the advanced clock).
99 /// If dur is <= 0 this is a noop.
100 void advance(Duration const& dur) {
101 advanceTo(now_ + dur);
102 }
103
104 /// Advance the clock to this absolute time. If t is <= now(),
105 /// this is a noop.
106 void advanceTo(TimePoint const& t);
107
108 TimePoint now() override {
109 return now_;
110 }
111
112 /// Flush the function queue. Destroys all stored functions without
113 /// executing them. Returns number of removed functions.
114 std::size_t clear() {
115 std::queue<Func> funcs;
116 std::priority_queue<ScheduledFunc> scheduled_funcs;
117
118 {
119 std::lock_guard<std::mutex> lock(lock_);
120 funcs_.swap(funcs);
121 scheduledFuncs_.swap(scheduled_funcs);
122 }
123
124 return funcs.size() + scheduled_funcs.size();
125 }
126
127 private:
128 std::mutex lock_;
129 std::queue<Func> funcs_;
130 LifoSem sem_;
131
132 // helper class to enable ordering of scheduled events in the priority
133 // queue
134 struct ScheduledFunc {
135 TimePoint time;
136 size_t ordinal;
137 Func mutable func;
138
139 ScheduledFunc(TimePoint const& t, Func&& f) : time(t), func(std::move(f)) {
140 static size_t seq = 0;
141 ordinal = seq++;
142 }
143
144 bool operator<(ScheduledFunc const& b) const {
145 // Earlier-scheduled things must be *higher* priority
146 // in the max-based std::priority_queue
147 if (time == b.time) {
148 return ordinal > b.ordinal;
149 }
150 return time > b.time;
151 }
152
153 Func&& moveOutFunc() const {
154 return std::move(func);
155 }
156 };
157 std::priority_queue<ScheduledFunc> scheduledFuncs_;
158 TimePoint now_ = TimePoint::min();
159};
160
161} // namespace folly
162