1 | /* |
2 | * Copyright 2014-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <cstdio> |
20 | #include <memory> |
21 | #include <mutex> |
22 | #include <queue> |
23 | |
24 | #include <folly/executors/DrivableExecutor.h> |
25 | #include <folly/executors/ScheduledExecutor.h> |
26 | #include <folly/executors/SequencedExecutor.h> |
27 | #include <folly/synchronization/LifoSem.h> |
28 | |
29 | namespace folly { |
30 | /// A ManualExecutor only does work when you turn the crank, by calling |
31 | /// run() or indirectly with makeProgress() or waitFor(). |
32 | /// |
33 | /// The clock for a manual executor starts at 0 and advances only when you |
34 | /// ask it to. i.e. time is also under manual control. |
35 | /// |
36 | /// NB No attempt has been made to make anything other than add and schedule |
37 | /// threadsafe. |
38 | class ManualExecutor : public DrivableExecutor, |
39 | public ScheduledExecutor, |
40 | public SequencedExecutor { |
41 | public: |
42 | ~ManualExecutor(); |
43 | |
44 | void add(Func) override; |
45 | |
46 | /// Do work. Returns the number of functions that were executed (maybe 0). |
47 | /// Non-blocking, in the sense that we don't wait for work (we can't |
48 | /// control whether one of the functions blocks). |
49 | /// This is stable, it will not chase an ever-increasing tail of work. |
50 | /// This also means, there may be more work available to perform at the |
51 | /// moment that this returns. |
52 | size_t run(); |
53 | |
54 | // Do work until there is no more work to do. |
55 | // Returns the number of functions that were executed (maybe 0). |
56 | // Unlike run, this method is not stable. It will chase an infinite tail of |
57 | // work so should be used with care. |
58 | // There will be no work available to perform at the moment that this |
59 | // returns. |
60 | size_t drain(); |
61 | |
62 | /// Wait for work to do. |
63 | void wait(); |
64 | |
65 | /// Wait for work to do, and do it. |
66 | void makeProgress() { |
67 | wait(); |
68 | run(); |
69 | } |
70 | |
71 | /// Implements DrivableExecutor |
72 | void drive() override { |
73 | makeProgress(); |
74 | } |
75 | |
76 | /// makeProgress until this Future is ready. |
77 | template <class F> |
78 | void waitFor(F const& f) { |
79 | // TODO(5427828) |
80 | #if 0 |
81 | while (!f.isReady()) |
82 | makeProgress(); |
83 | #else |
84 | while (!f.isReady()) { |
85 | run(); |
86 | } |
87 | #endif |
88 | } |
89 | |
90 | void scheduleAt(Func&& f, TimePoint const& t) override { |
91 | std::lock_guard<std::mutex> lock(lock_); |
92 | scheduledFuncs_.emplace(t, std::move(f)); |
93 | sem_.post(); |
94 | } |
95 | |
96 | /// Advance the clock. The clock never advances on its own. |
97 | /// Advancing the clock causes some work to be done, if work is available |
98 | /// to do (perhaps newly available because of the advanced clock). |
99 | /// If dur is <= 0 this is a noop. |
100 | void advance(Duration const& dur) { |
101 | advanceTo(now_ + dur); |
102 | } |
103 | |
104 | /// Advance the clock to this absolute time. If t is <= now(), |
105 | /// this is a noop. |
106 | void advanceTo(TimePoint const& t); |
107 | |
108 | TimePoint now() override { |
109 | return now_; |
110 | } |
111 | |
112 | /// Flush the function queue. Destroys all stored functions without |
113 | /// executing them. Returns number of removed functions. |
114 | std::size_t clear() { |
115 | std::queue<Func> funcs; |
116 | std::priority_queue<ScheduledFunc> scheduled_funcs; |
117 | |
118 | { |
119 | std::lock_guard<std::mutex> lock(lock_); |
120 | funcs_.swap(funcs); |
121 | scheduledFuncs_.swap(scheduled_funcs); |
122 | } |
123 | |
124 | return funcs.size() + scheduled_funcs.size(); |
125 | } |
126 | |
127 | private: |
128 | std::mutex lock_; |
129 | std::queue<Func> funcs_; |
130 | LifoSem sem_; |
131 | |
132 | // helper class to enable ordering of scheduled events in the priority |
133 | // queue |
134 | struct ScheduledFunc { |
135 | TimePoint time; |
136 | size_t ordinal; |
137 | Func mutable func; |
138 | |
139 | ScheduledFunc(TimePoint const& t, Func&& f) : time(t), func(std::move(f)) { |
140 | static size_t seq = 0; |
141 | ordinal = seq++; |
142 | } |
143 | |
144 | bool operator<(ScheduledFunc const& b) const { |
145 | // Earlier-scheduled things must be *higher* priority |
146 | // in the max-based std::priority_queue |
147 | if (time == b.time) { |
148 | return ordinal > b.ordinal; |
149 | } |
150 | return time > b.time; |
151 | } |
152 | |
153 | Func&& moveOutFunc() const { |
154 | return std::move(func); |
155 | } |
156 | }; |
157 | std::priority_queue<ScheduledFunc> scheduledFuncs_; |
158 | TimePoint now_ = TimePoint::min(); |
159 | }; |
160 | |
161 | } // namespace folly |
162 | |