1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/experimental/io/AsyncIO.h> |
18 | |
19 | #include <sys/eventfd.h> |
20 | #include <cerrno> |
21 | #include <ostream> |
22 | #include <stdexcept> |
23 | #include <string> |
24 | |
25 | #include <boost/intrusive/parent_from_member.hpp> |
26 | #include <glog/logging.h> |
27 | |
28 | #include <folly/Exception.h> |
29 | #include <folly/Format.h> |
30 | #include <folly/Likely.h> |
31 | #include <folly/String.h> |
32 | #include <folly/portability/Unistd.h> |
33 | |
34 | namespace folly { |
35 | |
36 | AsyncIOOp::AsyncIOOp(NotificationCallback cb) |
37 | : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) { |
38 | memset(&iocb_, 0, sizeof(iocb_)); |
39 | } |
40 | |
41 | void AsyncIOOp::reset(NotificationCallback cb) { |
42 | CHECK_NE(state_, State::PENDING); |
43 | cb_ = std::move(cb); |
44 | state_ = State::UNINITIALIZED; |
45 | result_ = -EINVAL; |
46 | memset(&iocb_, 0, sizeof(iocb_)); |
47 | } |
48 | |
49 | AsyncIOOp::~AsyncIOOp() { |
50 | CHECK_NE(state_, State::PENDING); |
51 | } |
52 | |
53 | void AsyncIOOp::start() { |
54 | DCHECK_EQ(state_, State::INITIALIZED); |
55 | state_ = State::PENDING; |
56 | } |
57 | |
58 | void AsyncIOOp::complete(ssize_t result) { |
59 | DCHECK_EQ(state_, State::PENDING); |
60 | state_ = State::COMPLETED; |
61 | result_ = result; |
62 | if (cb_) { |
63 | cb_(this); |
64 | } |
65 | } |
66 | |
67 | void AsyncIOOp::cancel() { |
68 | DCHECK_EQ(state_, State::PENDING); |
69 | state_ = State::CANCELED; |
70 | } |
71 | |
72 | ssize_t AsyncIOOp::result() const { |
73 | CHECK_EQ(state_, State::COMPLETED); |
74 | return result_; |
75 | } |
76 | |
77 | void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) { |
78 | init(); |
79 | io_prep_pread(&iocb_, fd, buf, size, start); |
80 | } |
81 | |
82 | void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) { |
83 | pread(fd, range.begin(), range.size(), start); |
84 | } |
85 | |
86 | void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) { |
87 | init(); |
88 | io_prep_preadv(&iocb_, fd, iov, iovcnt, start); |
89 | } |
90 | |
91 | void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) { |
92 | init(); |
93 | io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start); |
94 | } |
95 | |
96 | void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) { |
97 | pwrite(fd, range.begin(), range.size(), start); |
98 | } |
99 | |
100 | void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) { |
101 | init(); |
102 | io_prep_pwritev(&iocb_, fd, iov, iovcnt, start); |
103 | } |
104 | |
105 | void AsyncIOOp::init() { |
106 | CHECK_EQ(state_, State::UNINITIALIZED); |
107 | state_ = State::INITIALIZED; |
108 | } |
109 | |
110 | AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) { |
111 | CHECK_GT(capacity_, 0); |
112 | completed_.reserve(capacity_); |
113 | if (pollMode == POLLABLE) { |
114 | pollFd_ = eventfd(0, EFD_NONBLOCK); |
115 | checkUnixError(pollFd_, "AsyncIO: eventfd creation failed" ); |
116 | } |
117 | } |
118 | |
119 | AsyncIO::~AsyncIO() { |
120 | CHECK_EQ(pending_, 0); |
121 | if (ctx_) { |
122 | int rc = io_queue_release(ctx_); |
123 | CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc); |
124 | } |
125 | if (pollFd_ != -1) { |
126 | CHECK_ERR(close(pollFd_)); |
127 | } |
128 | } |
129 | |
130 | void AsyncIO::decrementPending() { |
131 | auto p = pending_.fetch_add(-1, std::memory_order_acq_rel); |
132 | DCHECK_GE(p, 1); |
133 | } |
134 | |
135 | void AsyncIO::initializeContext() { |
136 | if (!ctxSet_.load(std::memory_order_acquire)) { |
137 | std::lock_guard<std::mutex> lock(initMutex_); |
138 | if (!ctxSet_.load(std::memory_order_relaxed)) { |
139 | int rc = io_queue_init(capacity_, &ctx_); |
140 | // returns negative errno |
141 | if (rc == -EAGAIN) { |
142 | long aio_nr, aio_max; |
143 | std::unique_ptr<FILE, int (*)(FILE*)> fp( |
144 | fopen("/proc/sys/fs/aio-nr" , "r" ), fclose); |
145 | PCHECK(fp); |
146 | CHECK_EQ(fscanf(fp.get(), "%ld" , &aio_nr), 1); |
147 | |
148 | std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp( |
149 | fopen("/proc/sys/fs/aio-max-nr" , "r" ), fclose); |
150 | PCHECK(aio_max_fp); |
151 | CHECK_EQ(fscanf(aio_max_fp.get(), "%ld" , &aio_max), 1); |
152 | |
153 | LOG(ERROR) << "No resources for requested capacity of " << capacity_; |
154 | LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max; |
155 | } |
156 | |
157 | checkKernelError(rc, "AsyncIO: io_queue_init failed" ); |
158 | DCHECK(ctx_); |
159 | ctxSet_.store(true, std::memory_order_release); |
160 | } |
161 | } |
162 | } |
163 | |
164 | void AsyncIO::submit(Op* op) { |
165 | CHECK_EQ(op->state(), Op::State::INITIALIZED); |
166 | initializeContext(); // on demand |
167 | |
168 | // We can increment past capacity, but we'll clean up after ourselves. |
169 | auto p = pending_.fetch_add(1, std::memory_order_acq_rel); |
170 | if (p >= capacity_) { |
171 | decrementPending(); |
172 | throw std::range_error("AsyncIO: too many pending requests" ); |
173 | } |
174 | iocb* cb = &op->iocb_; |
175 | cb->data = nullptr; // unused |
176 | if (pollFd_ != -1) { |
177 | io_set_eventfd(cb, pollFd_); |
178 | } |
179 | int rc = io_submit(ctx_, 1, &cb); |
180 | if (rc < 0) { |
181 | decrementPending(); |
182 | throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed" ); |
183 | } |
184 | submitted_++; |
185 | DCHECK_EQ(rc, 1); |
186 | op->start(); |
187 | } |
188 | |
189 | Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) { |
190 | CHECK(ctx_); |
191 | CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object" ; |
192 | auto p = pending_.load(std::memory_order_acquire); |
193 | CHECK_LE(minRequests, p); |
194 | return doWait(WaitType::COMPLETE, minRequests, p, completed_); |
195 | } |
196 | |
197 | Range<AsyncIO::Op**> AsyncIO::cancel() { |
198 | CHECK(ctx_); |
199 | auto p = pending_.load(std::memory_order_acquire); |
200 | return doWait(WaitType::CANCEL, p, p, canceled_); |
201 | } |
202 | |
203 | Range<AsyncIO::Op**> AsyncIO::pollCompleted() { |
204 | CHECK(ctx_); |
205 | CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object" ; |
206 | uint64_t numEvents; |
207 | // This sets the eventFd counter to 0, see |
208 | // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html |
209 | ssize_t rc; |
210 | do { |
211 | rc = ::read(pollFd_, &numEvents, 8); |
212 | } while (rc == -1 && errno == EINTR); |
213 | if (UNLIKELY(rc == -1 && errno == EAGAIN)) { |
214 | return Range<Op**>(); // nothing completed |
215 | } |
216 | checkUnixError(rc, "AsyncIO: read from event fd failed" ); |
217 | DCHECK_EQ(rc, 8); |
218 | |
219 | DCHECK_GT(numEvents, 0); |
220 | DCHECK_LE(numEvents, pending_); |
221 | |
222 | // Don't reap more than numEvents, as we've just reset the counter to 0. |
223 | return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_); |
224 | } |
225 | |
226 | Range<AsyncIO::Op**> AsyncIO::doWait( |
227 | WaitType type, |
228 | size_t minRequests, |
229 | size_t maxRequests, |
230 | std::vector<Op*>& result) { |
231 | io_event events[maxRequests]; |
232 | |
233 | // Unfortunately, Linux AIO doesn't implement io_cancel, so even for |
234 | // WaitType::CANCEL we have to wait for IO completion. |
235 | size_t count = 0; |
236 | do { |
237 | int ret; |
238 | do { |
239 | // GOTCHA: io_getevents() may returns less than min_nr results if |
240 | // interrupted after some events have been read (if before, -EINTR |
241 | // is returned). |
242 | ret = io_getevents( |
243 | ctx_, |
244 | minRequests - count, |
245 | maxRequests - count, |
246 | events + count, |
247 | /* timeout */ nullptr); // wait forever |
248 | } while (ret == -EINTR); |
249 | // Check as may not be able to recover without leaking events. |
250 | CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error " |
251 | << errnoStr(-ret); |
252 | count += ret; |
253 | } while (count < minRequests); |
254 | DCHECK_LE(count, maxRequests); |
255 | |
256 | result.clear(); |
257 | for (size_t i = 0; i < count; ++i) { |
258 | DCHECK(events[i].obj); |
259 | Op* op = boost::intrusive::get_parent_from_member( |
260 | events[i].obj, &AsyncIOOp::iocb_); |
261 | decrementPending(); |
262 | switch (type) { |
263 | case WaitType::COMPLETE: |
264 | op->complete(events[i].res); |
265 | break; |
266 | case WaitType::CANCEL: |
267 | op->cancel(); |
268 | break; |
269 | } |
270 | result.push_back(op); |
271 | } |
272 | |
273 | return range(result); |
274 | } |
275 | |
276 | AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {} |
277 | |
278 | AsyncIOQueue::~AsyncIOQueue() { |
279 | CHECK_EQ(asyncIO_->pending(), 0); |
280 | } |
281 | |
282 | void AsyncIOQueue::submit(AsyncIOOp* op) { |
283 | submit([op]() { return op; }); |
284 | } |
285 | |
286 | void AsyncIOQueue::submit(OpFactory op) { |
287 | queue_.push_back(op); |
288 | maybeDequeue(); |
289 | } |
290 | |
291 | void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { |
292 | maybeDequeue(); |
293 | } |
294 | |
295 | void AsyncIOQueue::maybeDequeue() { |
296 | while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) { |
297 | auto& opFactory = queue_.front(); |
298 | auto op = opFactory(); |
299 | queue_.pop_front(); |
300 | |
301 | // Interpose our completion callback |
302 | auto& nextCb = op->notificationCallback(); |
303 | op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) { |
304 | this->onCompleted(op2); |
305 | if (nextCb) { |
306 | nextCb(op2); |
307 | } |
308 | }); |
309 | |
310 | asyncIO_->submit(op); |
311 | } |
312 | } |
313 | |
314 | // debugging helpers: |
315 | |
316 | namespace { |
317 | |
318 | #define X(c) \ |
319 | case c: \ |
320 | return #c |
321 | |
322 | const char* asyncIoOpStateToString(AsyncIOOp::State state) { |
323 | switch (state) { |
324 | X(AsyncIOOp::State::UNINITIALIZED); |
325 | X(AsyncIOOp::State::INITIALIZED); |
326 | X(AsyncIOOp::State::PENDING); |
327 | X(AsyncIOOp::State::COMPLETED); |
328 | X(AsyncIOOp::State::CANCELED); |
329 | } |
330 | return "<INVALID AsyncIOOp::State>" ; |
331 | } |
332 | |
333 | const char* iocbCmdToString(short int cmd_short) { |
334 | io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short); |
335 | switch (cmd) { |
336 | X(IO_CMD_PREAD); |
337 | X(IO_CMD_PWRITE); |
338 | X(IO_CMD_FSYNC); |
339 | X(IO_CMD_FDSYNC); |
340 | X(IO_CMD_POLL); |
341 | X(IO_CMD_NOOP); |
342 | X(IO_CMD_PREADV); |
343 | X(IO_CMD_PWRITEV); |
344 | }; |
345 | return "<INVALID io_iocb_cmd>" ; |
346 | } |
347 | |
348 | #undef X |
349 | |
350 | std::string fd2name(int fd) { |
351 | std::string path = folly::to<std::string>("/proc/self/fd/" , fd); |
352 | char link[PATH_MAX]; |
353 | const ssize_t length = |
354 | std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0); |
355 | return path.assign(link, length); |
356 | } |
357 | |
358 | std::ostream& operator<<(std::ostream& os, const iocb& cb) { |
359 | os << folly::format( |
360 | "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, " , |
361 | cb.data, |
362 | cb.key, |
363 | iocbCmdToString(cb.aio_lio_opcode), |
364 | cb.aio_reqprio, |
365 | cb.aio_fildes, |
366 | fd2name(cb.aio_fildes)); |
367 | |
368 | switch (cb.aio_lio_opcode) { |
369 | case IO_CMD_PREAD: |
370 | case IO_CMD_PWRITE: |
371 | os << folly::format( |
372 | "buf={}, offset={}, nbytes={}, " , |
373 | cb.u.c.buf, |
374 | cb.u.c.offset, |
375 | cb.u.c.nbytes); |
376 | break; |
377 | default: |
378 | os << "[TODO: write debug string for " |
379 | << iocbCmdToString(cb.aio_lio_opcode) << "] " ; |
380 | break; |
381 | } |
382 | |
383 | return os; |
384 | } |
385 | |
386 | } // namespace |
387 | |
388 | std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) { |
389 | os << "{" << op.state_ << ", " ; |
390 | |
391 | if (op.state_ != AsyncIOOp::State::UNINITIALIZED) { |
392 | os << op.iocb_; |
393 | } |
394 | |
395 | if (op.state_ == AsyncIOOp::State::COMPLETED) { |
396 | os << "result=" << op.result_; |
397 | if (op.result_ < 0) { |
398 | os << " (" << errnoStr(-op.result_) << ')'; |
399 | } |
400 | os << ", " ; |
401 | } |
402 | |
403 | return os << "}" ; |
404 | } |
405 | |
406 | std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) { |
407 | return os << asyncIoOpStateToString(state); |
408 | } |
409 | |
410 | } // namespace folly |
411 | |