1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/experimental/io/AsyncIO.h>
18
19#include <sys/eventfd.h>
20#include <cerrno>
21#include <ostream>
22#include <stdexcept>
23#include <string>
24
25#include <boost/intrusive/parent_from_member.hpp>
26#include <glog/logging.h>
27
28#include <folly/Exception.h>
29#include <folly/Format.h>
30#include <folly/Likely.h>
31#include <folly/String.h>
32#include <folly/portability/Unistd.h>
33
34namespace folly {
35
36AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37 : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
38 memset(&iocb_, 0, sizeof(iocb_));
39}
40
41void AsyncIOOp::reset(NotificationCallback cb) {
42 CHECK_NE(state_, State::PENDING);
43 cb_ = std::move(cb);
44 state_ = State::UNINITIALIZED;
45 result_ = -EINVAL;
46 memset(&iocb_, 0, sizeof(iocb_));
47}
48
49AsyncIOOp::~AsyncIOOp() {
50 CHECK_NE(state_, State::PENDING);
51}
52
53void AsyncIOOp::start() {
54 DCHECK_EQ(state_, State::INITIALIZED);
55 state_ = State::PENDING;
56}
57
58void AsyncIOOp::complete(ssize_t result) {
59 DCHECK_EQ(state_, State::PENDING);
60 state_ = State::COMPLETED;
61 result_ = result;
62 if (cb_) {
63 cb_(this);
64 }
65}
66
67void AsyncIOOp::cancel() {
68 DCHECK_EQ(state_, State::PENDING);
69 state_ = State::CANCELED;
70}
71
72ssize_t AsyncIOOp::result() const {
73 CHECK_EQ(state_, State::COMPLETED);
74 return result_;
75}
76
77void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
78 init();
79 io_prep_pread(&iocb_, fd, buf, size, start);
80}
81
82void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
83 pread(fd, range.begin(), range.size(), start);
84}
85
86void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
87 init();
88 io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
89}
90
91void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
92 init();
93 io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
94}
95
96void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
97 pwrite(fd, range.begin(), range.size(), start);
98}
99
100void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
101 init();
102 io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
103}
104
105void AsyncIOOp::init() {
106 CHECK_EQ(state_, State::UNINITIALIZED);
107 state_ = State::INITIALIZED;
108}
109
110AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
111 CHECK_GT(capacity_, 0);
112 completed_.reserve(capacity_);
113 if (pollMode == POLLABLE) {
114 pollFd_ = eventfd(0, EFD_NONBLOCK);
115 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
116 }
117}
118
119AsyncIO::~AsyncIO() {
120 CHECK_EQ(pending_, 0);
121 if (ctx_) {
122 int rc = io_queue_release(ctx_);
123 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
124 }
125 if (pollFd_ != -1) {
126 CHECK_ERR(close(pollFd_));
127 }
128}
129
130void AsyncIO::decrementPending() {
131 auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
132 DCHECK_GE(p, 1);
133}
134
135void AsyncIO::initializeContext() {
136 if (!ctxSet_.load(std::memory_order_acquire)) {
137 std::lock_guard<std::mutex> lock(initMutex_);
138 if (!ctxSet_.load(std::memory_order_relaxed)) {
139 int rc = io_queue_init(capacity_, &ctx_);
140 // returns negative errno
141 if (rc == -EAGAIN) {
142 long aio_nr, aio_max;
143 std::unique_ptr<FILE, int (*)(FILE*)> fp(
144 fopen("/proc/sys/fs/aio-nr", "r"), fclose);
145 PCHECK(fp);
146 CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
147
148 std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
149 fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
150 PCHECK(aio_max_fp);
151 CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
152
153 LOG(ERROR) << "No resources for requested capacity of " << capacity_;
154 LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
155 }
156
157 checkKernelError(rc, "AsyncIO: io_queue_init failed");
158 DCHECK(ctx_);
159 ctxSet_.store(true, std::memory_order_release);
160 }
161 }
162}
163
164void AsyncIO::submit(Op* op) {
165 CHECK_EQ(op->state(), Op::State::INITIALIZED);
166 initializeContext(); // on demand
167
168 // We can increment past capacity, but we'll clean up after ourselves.
169 auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
170 if (p >= capacity_) {
171 decrementPending();
172 throw std::range_error("AsyncIO: too many pending requests");
173 }
174 iocb* cb = &op->iocb_;
175 cb->data = nullptr; // unused
176 if (pollFd_ != -1) {
177 io_set_eventfd(cb, pollFd_);
178 }
179 int rc = io_submit(ctx_, 1, &cb);
180 if (rc < 0) {
181 decrementPending();
182 throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
183 }
184 submitted_++;
185 DCHECK_EQ(rc, 1);
186 op->start();
187}
188
189Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
190 CHECK(ctx_);
191 CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
192 auto p = pending_.load(std::memory_order_acquire);
193 CHECK_LE(minRequests, p);
194 return doWait(WaitType::COMPLETE, minRequests, p, completed_);
195}
196
197Range<AsyncIO::Op**> AsyncIO::cancel() {
198 CHECK(ctx_);
199 auto p = pending_.load(std::memory_order_acquire);
200 return doWait(WaitType::CANCEL, p, p, canceled_);
201}
202
203Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
204 CHECK(ctx_);
205 CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
206 uint64_t numEvents;
207 // This sets the eventFd counter to 0, see
208 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
209 ssize_t rc;
210 do {
211 rc = ::read(pollFd_, &numEvents, 8);
212 } while (rc == -1 && errno == EINTR);
213 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
214 return Range<Op**>(); // nothing completed
215 }
216 checkUnixError(rc, "AsyncIO: read from event fd failed");
217 DCHECK_EQ(rc, 8);
218
219 DCHECK_GT(numEvents, 0);
220 DCHECK_LE(numEvents, pending_);
221
222 // Don't reap more than numEvents, as we've just reset the counter to 0.
223 return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
224}
225
226Range<AsyncIO::Op**> AsyncIO::doWait(
227 WaitType type,
228 size_t minRequests,
229 size_t maxRequests,
230 std::vector<Op*>& result) {
231 io_event events[maxRequests];
232
233 // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
234 // WaitType::CANCEL we have to wait for IO completion.
235 size_t count = 0;
236 do {
237 int ret;
238 do {
239 // GOTCHA: io_getevents() may returns less than min_nr results if
240 // interrupted after some events have been read (if before, -EINTR
241 // is returned).
242 ret = io_getevents(
243 ctx_,
244 minRequests - count,
245 maxRequests - count,
246 events + count,
247 /* timeout */ nullptr); // wait forever
248 } while (ret == -EINTR);
249 // Check as may not be able to recover without leaking events.
250 CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
251 << errnoStr(-ret);
252 count += ret;
253 } while (count < minRequests);
254 DCHECK_LE(count, maxRequests);
255
256 result.clear();
257 for (size_t i = 0; i < count; ++i) {
258 DCHECK(events[i].obj);
259 Op* op = boost::intrusive::get_parent_from_member(
260 events[i].obj, &AsyncIOOp::iocb_);
261 decrementPending();
262 switch (type) {
263 case WaitType::COMPLETE:
264 op->complete(events[i].res);
265 break;
266 case WaitType::CANCEL:
267 op->cancel();
268 break;
269 }
270 result.push_back(op);
271 }
272
273 return range(result);
274}
275
276AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
277
278AsyncIOQueue::~AsyncIOQueue() {
279 CHECK_EQ(asyncIO_->pending(), 0);
280}
281
282void AsyncIOQueue::submit(AsyncIOOp* op) {
283 submit([op]() { return op; });
284}
285
286void AsyncIOQueue::submit(OpFactory op) {
287 queue_.push_back(op);
288 maybeDequeue();
289}
290
291void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
292 maybeDequeue();
293}
294
295void AsyncIOQueue::maybeDequeue() {
296 while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
297 auto& opFactory = queue_.front();
298 auto op = opFactory();
299 queue_.pop_front();
300
301 // Interpose our completion callback
302 auto& nextCb = op->notificationCallback();
303 op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
304 this->onCompleted(op2);
305 if (nextCb) {
306 nextCb(op2);
307 }
308 });
309
310 asyncIO_->submit(op);
311 }
312}
313
314// debugging helpers:
315
316namespace {
317
318#define X(c) \
319 case c: \
320 return #c
321
322const char* asyncIoOpStateToString(AsyncIOOp::State state) {
323 switch (state) {
324 X(AsyncIOOp::State::UNINITIALIZED);
325 X(AsyncIOOp::State::INITIALIZED);
326 X(AsyncIOOp::State::PENDING);
327 X(AsyncIOOp::State::COMPLETED);
328 X(AsyncIOOp::State::CANCELED);
329 }
330 return "<INVALID AsyncIOOp::State>";
331}
332
333const char* iocbCmdToString(short int cmd_short) {
334 io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
335 switch (cmd) {
336 X(IO_CMD_PREAD);
337 X(IO_CMD_PWRITE);
338 X(IO_CMD_FSYNC);
339 X(IO_CMD_FDSYNC);
340 X(IO_CMD_POLL);
341 X(IO_CMD_NOOP);
342 X(IO_CMD_PREADV);
343 X(IO_CMD_PWRITEV);
344 };
345 return "<INVALID io_iocb_cmd>";
346}
347
348#undef X
349
350std::string fd2name(int fd) {
351 std::string path = folly::to<std::string>("/proc/self/fd/", fd);
352 char link[PATH_MAX];
353 const ssize_t length =
354 std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
355 return path.assign(link, length);
356}
357
358std::ostream& operator<<(std::ostream& os, const iocb& cb) {
359 os << folly::format(
360 "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
361 cb.data,
362 cb.key,
363 iocbCmdToString(cb.aio_lio_opcode),
364 cb.aio_reqprio,
365 cb.aio_fildes,
366 fd2name(cb.aio_fildes));
367
368 switch (cb.aio_lio_opcode) {
369 case IO_CMD_PREAD:
370 case IO_CMD_PWRITE:
371 os << folly::format(
372 "buf={}, offset={}, nbytes={}, ",
373 cb.u.c.buf,
374 cb.u.c.offset,
375 cb.u.c.nbytes);
376 break;
377 default:
378 os << "[TODO: write debug string for "
379 << iocbCmdToString(cb.aio_lio_opcode) << "] ";
380 break;
381 }
382
383 return os;
384}
385
386} // namespace
387
388std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
389 os << "{" << op.state_ << ", ";
390
391 if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
392 os << op.iocb_;
393 }
394
395 if (op.state_ == AsyncIOOp::State::COMPLETED) {
396 os << "result=" << op.result_;
397 if (op.result_ < 0) {
398 os << " (" << errnoStr(-op.result_) << ')';
399 }
400 os << ", ";
401 }
402
403 return os << "}";
404}
405
406std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
407 return os << asyncIoOpStateToString(state);
408}
409
410} // namespace folly
411