1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <sys/types.h>
20
21#include <atomic>
22#include <cstdint>
23#include <deque>
24#include <functional>
25#include <iosfwd>
26#include <mutex>
27#include <utility>
28#include <vector>
29
30#include <boost/noncopyable.hpp>
31#include <libaio.h>
32
33#include <folly/Portability.h>
34#include <folly/Range.h>
35#include <folly/portability/SysUio.h>
36
37namespace folly {
38
39/**
40 * An AsyncIOOp represents a pending operation. You may set a notification
41 * callback or you may use this class's methods directly.
42 *
43 * The op must remain allocated until it is completed or canceled.
44 */
45class AsyncIOOp : private boost::noncopyable {
46 friend class AsyncIO;
47 friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
48
49 public:
50 typedef std::function<void(AsyncIOOp*)> NotificationCallback;
51
52 explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
53 ~AsyncIOOp();
54
55 enum class State {
56 UNINITIALIZED,
57 INITIALIZED,
58 PENDING,
59 COMPLETED,
60 CANCELED,
61 };
62
63 /**
64 * Initiate a read request.
65 */
66 void pread(int fd, void* buf, size_t size, off_t start);
67 void pread(int fd, Range<unsigned char*> range, off_t start);
68 void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
69
70 /**
71 * Initiate a write request.
72 */
73 void pwrite(int fd, const void* buf, size_t size, off_t start);
74 void pwrite(int fd, Range<const unsigned char*> range, off_t start);
75 void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
76
77 /**
78 * Return the current operation state.
79 */
80 State state() const {
81 return state_;
82 }
83
84 /**
85 * Reset the operation for reuse. It is an error to call reset() on
86 * an Op that is still pending.
87 */
88 void reset(NotificationCallback cb = NotificationCallback());
89
90 void setNotificationCallback(NotificationCallback cb) {
91 cb_ = std::move(cb);
92 }
93 const NotificationCallback& notificationCallback() const {
94 return cb_;
95 }
96
97 /**
98 * Retrieve the result of this operation. Returns >=0 on success,
99 * -errno on failure (that is, using the Linux kernel error reporting
100 * conventions). Use checkKernelError (folly/Exception.h) on the result to
101 * throw a std::system_error in case of error instead.
102 *
103 * It is an error to call this if the Op hasn't completed.
104 */
105 ssize_t result() const;
106
107 private:
108 void init();
109 void start();
110 void complete(ssize_t result);
111 void cancel();
112
113 NotificationCallback cb_;
114 iocb iocb_;
115 State state_;
116 ssize_t result_;
117};
118
119std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
120std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
121
122/**
123 * C++ interface around Linux Async IO.
124 */
125class AsyncIO : private boost::noncopyable {
126 public:
127 typedef AsyncIOOp Op;
128
129 enum PollMode {
130 NOT_POLLABLE,
131 POLLABLE,
132 };
133
134 /**
135 * Create an AsyncIO context capable of holding at most 'capacity' pending
136 * requests at the same time. As requests complete, others can be scheduled,
137 * as long as this limit is not exceeded.
138 *
139 * Note: the maximum number of allowed concurrent requests is controlled
140 * by the fs.aio-max-nr sysctl, the default value is usually 64K.
141 *
142 * If pollMode is POLLABLE, pollFd() will return a file descriptor that
143 * can be passed to poll / epoll / select and will become readable when
144 * any IOs on this AsyncIO have completed. If you do this, you must use
145 * pollCompleted() instead of wait() -- do not read from the pollFd()
146 * file descriptor directly.
147 *
148 * You may use the same AsyncIO object from multiple threads, as long as
149 * there is only one concurrent caller of wait() / pollCompleted() / cancel()
150 * (perhaps by always calling it from the same thread, or by providing
151 * appropriate mutual exclusion). In this case, pending() returns a snapshot
152 * of the current number of pending requests.
153 */
154 explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
155 ~AsyncIO();
156
157 /**
158 * Wait for at least minRequests to complete. Returns the requests that
159 * have completed; the returned range is valid until the next call to
160 * wait(). minRequests may be 0 to not block.
161 */
162 Range<Op**> wait(size_t minRequests);
163
164 /**
165 * Cancel all pending requests and return them; the returned range is
166 * valid until the next call to cancel().
167 */
168 Range<Op**> cancel();
169
170 /**
171 * Return the number of pending requests.
172 */
173 size_t pending() const {
174 return pending_;
175 }
176
177 /**
178 * Return the maximum number of requests that can be kept outstanding
179 * at any one time.
180 */
181 size_t capacity() const {
182 return capacity_;
183 }
184
185 /**
186 * Return the accumulative number of submitted I/O, since this object
187 * has been created.
188 */
189 size_t totalSubmits() const {
190 return submitted_;
191 }
192
193 /**
194 * If POLLABLE, return a file descriptor that can be passed to poll / epoll
195 * and will become readable when any async IO operations have completed.
196 * If NOT_POLLABLE, return -1.
197 */
198 int pollFd() const {
199 return pollFd_;
200 }
201
202 /**
203 * If POLLABLE, call instead of wait after the file descriptor returned
204 * by pollFd() became readable. The returned range is valid until the next
205 * call to pollCompleted().
206 */
207 Range<Op**> pollCompleted();
208
209 /**
210 * Submit an op for execution.
211 */
212 void submit(Op* op);
213
214 private:
215 void decrementPending();
216 void initializeContext();
217
218 enum class WaitType { COMPLETE, CANCEL };
219 Range<AsyncIO::Op**> doWait(
220 WaitType type,
221 size_t minRequests,
222 size_t maxRequests,
223 std::vector<Op*>& result);
224
225 io_context_t ctx_{nullptr};
226 std::atomic<bool> ctxSet_{false};
227 std::mutex initMutex_;
228
229 std::atomic<size_t> pending_{0};
230 std::atomic<size_t> submitted_{0};
231 const size_t capacity_;
232 int pollFd_{-1};
233 std::vector<Op*> completed_;
234 std::vector<Op*> canceled_;
235};
236
237/**
238 * Wrapper around AsyncIO that allows you to schedule more requests than
239 * the AsyncIO's object capacity. Other requests are queued and processed
240 * in a FIFO order.
241 */
242class AsyncIOQueue {
243 public:
244 /**
245 * Create a queue, using the given AsyncIO object.
246 * The AsyncIO object may not be used by anything else until the
247 * queue is destroyed.
248 */
249 explicit AsyncIOQueue(AsyncIO* asyncIO);
250 ~AsyncIOQueue();
251
252 size_t queued() const {
253 return queue_.size();
254 }
255
256 /**
257 * Submit an op to the AsyncIO queue. The op will be queued until
258 * the AsyncIO object has room.
259 */
260 void submit(AsyncIOOp* op);
261
262 /**
263 * Submit a delayed op to the AsyncIO queue; this allows you to postpone
264 * creation of the Op (which may require allocating memory, etc) until
265 * the AsyncIO object has room.
266 */
267 typedef std::function<AsyncIOOp*()> OpFactory;
268 void submit(OpFactory op);
269
270 private:
271 void onCompleted(AsyncIOOp* op);
272 void maybeDequeue();
273
274 AsyncIO* asyncIO_;
275
276 std::deque<OpFactory> queue_;
277};
278
279} // namespace folly
280