1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <sys/types.h> |
20 | |
21 | #include <atomic> |
22 | #include <cstdint> |
23 | #include <deque> |
24 | #include <functional> |
25 | #include <iosfwd> |
26 | #include <mutex> |
27 | #include <utility> |
28 | #include <vector> |
29 | |
30 | #include <boost/noncopyable.hpp> |
31 | #include <libaio.h> |
32 | |
33 | #include <folly/Portability.h> |
34 | #include <folly/Range.h> |
35 | #include <folly/portability/SysUio.h> |
36 | |
37 | namespace folly { |
38 | |
39 | /** |
40 | * An AsyncIOOp represents a pending operation. You may set a notification |
41 | * callback or you may use this class's methods directly. |
42 | * |
43 | * The op must remain allocated until it is completed or canceled. |
44 | */ |
45 | class AsyncIOOp : private boost::noncopyable { |
46 | friend class AsyncIO; |
47 | friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o); |
48 | |
49 | public: |
50 | typedef std::function<void(AsyncIOOp*)> NotificationCallback; |
51 | |
52 | explicit AsyncIOOp(NotificationCallback cb = NotificationCallback()); |
53 | ~AsyncIOOp(); |
54 | |
55 | enum class State { |
56 | UNINITIALIZED, |
57 | INITIALIZED, |
58 | PENDING, |
59 | COMPLETED, |
60 | CANCELED, |
61 | }; |
62 | |
63 | /** |
64 | * Initiate a read request. |
65 | */ |
66 | void pread(int fd, void* buf, size_t size, off_t start); |
67 | void pread(int fd, Range<unsigned char*> range, off_t start); |
68 | void preadv(int fd, const iovec* iov, int iovcnt, off_t start); |
69 | |
70 | /** |
71 | * Initiate a write request. |
72 | */ |
73 | void pwrite(int fd, const void* buf, size_t size, off_t start); |
74 | void pwrite(int fd, Range<const unsigned char*> range, off_t start); |
75 | void pwritev(int fd, const iovec* iov, int iovcnt, off_t start); |
76 | |
77 | /** |
78 | * Return the current operation state. |
79 | */ |
80 | State state() const { |
81 | return state_; |
82 | } |
83 | |
84 | /** |
85 | * Reset the operation for reuse. It is an error to call reset() on |
86 | * an Op that is still pending. |
87 | */ |
88 | void reset(NotificationCallback cb = NotificationCallback()); |
89 | |
90 | void setNotificationCallback(NotificationCallback cb) { |
91 | cb_ = std::move(cb); |
92 | } |
93 | const NotificationCallback& notificationCallback() const { |
94 | return cb_; |
95 | } |
96 | |
97 | /** |
98 | * Retrieve the result of this operation. Returns >=0 on success, |
99 | * -errno on failure (that is, using the Linux kernel error reporting |
100 | * conventions). Use checkKernelError (folly/Exception.h) on the result to |
101 | * throw a std::system_error in case of error instead. |
102 | * |
103 | * It is an error to call this if the Op hasn't completed. |
104 | */ |
105 | ssize_t result() const; |
106 | |
107 | private: |
108 | void init(); |
109 | void start(); |
110 | void complete(ssize_t result); |
111 | void cancel(); |
112 | |
113 | NotificationCallback cb_; |
114 | iocb iocb_; |
115 | State state_; |
116 | ssize_t result_; |
117 | }; |
118 | |
119 | std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o); |
120 | std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state); |
121 | |
122 | /** |
123 | * C++ interface around Linux Async IO. |
124 | */ |
125 | class AsyncIO : private boost::noncopyable { |
126 | public: |
127 | typedef AsyncIOOp Op; |
128 | |
129 | enum PollMode { |
130 | NOT_POLLABLE, |
131 | POLLABLE, |
132 | }; |
133 | |
134 | /** |
135 | * Create an AsyncIO context capable of holding at most 'capacity' pending |
136 | * requests at the same time. As requests complete, others can be scheduled, |
137 | * as long as this limit is not exceeded. |
138 | * |
139 | * Note: the maximum number of allowed concurrent requests is controlled |
140 | * by the fs.aio-max-nr sysctl, the default value is usually 64K. |
141 | * |
142 | * If pollMode is POLLABLE, pollFd() will return a file descriptor that |
143 | * can be passed to poll / epoll / select and will become readable when |
144 | * any IOs on this AsyncIO have completed. If you do this, you must use |
145 | * pollCompleted() instead of wait() -- do not read from the pollFd() |
146 | * file descriptor directly. |
147 | * |
148 | * You may use the same AsyncIO object from multiple threads, as long as |
149 | * there is only one concurrent caller of wait() / pollCompleted() / cancel() |
150 | * (perhaps by always calling it from the same thread, or by providing |
151 | * appropriate mutual exclusion). In this case, pending() returns a snapshot |
152 | * of the current number of pending requests. |
153 | */ |
154 | explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE); |
155 | ~AsyncIO(); |
156 | |
157 | /** |
158 | * Wait for at least minRequests to complete. Returns the requests that |
159 | * have completed; the returned range is valid until the next call to |
160 | * wait(). minRequests may be 0 to not block. |
161 | */ |
162 | Range<Op**> wait(size_t minRequests); |
163 | |
164 | /** |
165 | * Cancel all pending requests and return them; the returned range is |
166 | * valid until the next call to cancel(). |
167 | */ |
168 | Range<Op**> cancel(); |
169 | |
170 | /** |
171 | * Return the number of pending requests. |
172 | */ |
173 | size_t pending() const { |
174 | return pending_; |
175 | } |
176 | |
177 | /** |
178 | * Return the maximum number of requests that can be kept outstanding |
179 | * at any one time. |
180 | */ |
181 | size_t capacity() const { |
182 | return capacity_; |
183 | } |
184 | |
185 | /** |
186 | * Return the accumulative number of submitted I/O, since this object |
187 | * has been created. |
188 | */ |
189 | size_t totalSubmits() const { |
190 | return submitted_; |
191 | } |
192 | |
193 | /** |
194 | * If POLLABLE, return a file descriptor that can be passed to poll / epoll |
195 | * and will become readable when any async IO operations have completed. |
196 | * If NOT_POLLABLE, return -1. |
197 | */ |
198 | int pollFd() const { |
199 | return pollFd_; |
200 | } |
201 | |
202 | /** |
203 | * If POLLABLE, call instead of wait after the file descriptor returned |
204 | * by pollFd() became readable. The returned range is valid until the next |
205 | * call to pollCompleted(). |
206 | */ |
207 | Range<Op**> pollCompleted(); |
208 | |
209 | /** |
210 | * Submit an op for execution. |
211 | */ |
212 | void submit(Op* op); |
213 | |
214 | private: |
215 | void decrementPending(); |
216 | void initializeContext(); |
217 | |
218 | enum class WaitType { COMPLETE, CANCEL }; |
219 | Range<AsyncIO::Op**> doWait( |
220 | WaitType type, |
221 | size_t minRequests, |
222 | size_t maxRequests, |
223 | std::vector<Op*>& result); |
224 | |
225 | io_context_t ctx_{nullptr}; |
226 | std::atomic<bool> ctxSet_{false}; |
227 | std::mutex initMutex_; |
228 | |
229 | std::atomic<size_t> pending_{0}; |
230 | std::atomic<size_t> submitted_{0}; |
231 | const size_t capacity_; |
232 | int pollFd_{-1}; |
233 | std::vector<Op*> completed_; |
234 | std::vector<Op*> canceled_; |
235 | }; |
236 | |
237 | /** |
238 | * Wrapper around AsyncIO that allows you to schedule more requests than |
239 | * the AsyncIO's object capacity. Other requests are queued and processed |
240 | * in a FIFO order. |
241 | */ |
242 | class AsyncIOQueue { |
243 | public: |
244 | /** |
245 | * Create a queue, using the given AsyncIO object. |
246 | * The AsyncIO object may not be used by anything else until the |
247 | * queue is destroyed. |
248 | */ |
249 | explicit AsyncIOQueue(AsyncIO* asyncIO); |
250 | ~AsyncIOQueue(); |
251 | |
252 | size_t queued() const { |
253 | return queue_.size(); |
254 | } |
255 | |
256 | /** |
257 | * Submit an op to the AsyncIO queue. The op will be queued until |
258 | * the AsyncIO object has room. |
259 | */ |
260 | void submit(AsyncIOOp* op); |
261 | |
262 | /** |
263 | * Submit a delayed op to the AsyncIO queue; this allows you to postpone |
264 | * creation of the Op (which may require allocating memory, etc) until |
265 | * the AsyncIO object has room. |
266 | */ |
267 | typedef std::function<AsyncIOOp*()> OpFactory; |
268 | void submit(OpFactory op); |
269 | |
270 | private: |
271 | void onCompleted(AsyncIOOp* op); |
272 | void maybeDequeue(); |
273 | |
274 | AsyncIO* asyncIO_; |
275 | |
276 | std::deque<OpFactory> queue_; |
277 | }; |
278 | |
279 | } // namespace folly |
280 | |