| 1 | #if defined(__linux__) || defined(__FreeBSD__) |
| 2 | |
| 3 | #include <Common/Exception.h> |
| 4 | #include <common/logger_useful.h> |
| 5 | #include <Common/MemorySanitizer.h> |
| 6 | #include <Poco/Logger.h> |
| 7 | #include <boost/range/iterator_range.hpp> |
| 8 | #include <errno.h> |
| 9 | |
| 10 | #include <IO/AIOContextPool.h> |
| 11 | |
| 12 | |
| 13 | namespace DB |
| 14 | { |
| 15 | |
| 16 | namespace ErrorCodes |
| 17 | { |
| 18 | extern const int CANNOT_IO_SUBMIT; |
| 19 | extern const int CANNOT_IO_GETEVENTS; |
| 20 | } |
| 21 | |
| 22 | |
| 23 | AIOContextPool::~AIOContextPool() |
| 24 | { |
| 25 | cancelled.store(true, std::memory_order_relaxed); |
| 26 | io_completion_monitor.join(); |
| 27 | } |
| 28 | |
| 29 | |
| 30 | void AIOContextPool::doMonitor() |
| 31 | { |
| 32 | /// continue checking for events unless cancelled |
| 33 | while (!cancelled.load(std::memory_order_relaxed)) |
| 34 | waitForCompletion(); |
| 35 | |
| 36 | /// wait until all requests have been completed |
| 37 | while (!promises.empty()) |
| 38 | waitForCompletion(); |
| 39 | } |
| 40 | |
| 41 | |
| 42 | void AIOContextPool::waitForCompletion() |
| 43 | { |
| 44 | /// array to hold completion events |
| 45 | io_event events[max_concurrent_events]; |
| 46 | |
| 47 | try |
| 48 | { |
| 49 | const auto num_events = getCompletionEvents(events, max_concurrent_events); |
| 50 | fulfillPromises(events, num_events); |
| 51 | notifyProducers(num_events); |
| 52 | } |
| 53 | catch (...) |
| 54 | { |
| 55 | /// there was an error, log it, return to any producer and continue |
| 56 | reportExceptionToAnyProducer(); |
| 57 | tryLogCurrentException("AIOContextPool::waitForCompletion()" ); |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | |
| 62 | int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) |
| 63 | { |
| 64 | timespec timeout{timeout_sec, 0}; |
| 65 | |
| 66 | auto num_events = 0; |
| 67 | |
| 68 | /// request 1 to `max_events` events |
| 69 | while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) |
| 70 | if (errno != EINTR) |
| 71 | throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion" , ErrorCodes::CANNOT_IO_GETEVENTS, errno); |
| 72 | |
| 73 | /// Unpoison the memory returned from a non-instrumented system call. |
| 74 | __msan_unpoison(events, sizeof(*events) * num_events); |
| 75 | |
| 76 | return num_events; |
| 77 | } |
| 78 | |
| 79 | |
| 80 | void AIOContextPool::fulfillPromises(const io_event events[], const int num_events) |
| 81 | { |
| 82 | if (num_events == 0) |
| 83 | return; |
| 84 | |
| 85 | const std::lock_guard lock{mutex}; |
| 86 | |
| 87 | /// look at returned events and find corresponding promise, set result and erase promise from map |
| 88 | for (const auto & event : boost::make_iterator_range(events, events + num_events)) |
| 89 | { |
| 90 | /// get id from event |
| 91 | #if defined(__FreeBSD__) |
| 92 | const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data; |
| 93 | #else |
| 94 | const auto completed_id = event.data; |
| 95 | #endif |
| 96 | |
| 97 | /// set value via promise and release it |
| 98 | const auto it = promises.find(completed_id); |
| 99 | if (it == std::end(promises)) |
| 100 | { |
| 101 | LOG_ERROR(&Poco::Logger::get("AIOcontextPool" ), "Found io_event with unknown id " << completed_id); |
| 102 | continue; |
| 103 | } |
| 104 | |
| 105 | #if defined(__FreeBSD__) |
| 106 | it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata))); |
| 107 | #else |
| 108 | it->second.set_value(event.res); |
| 109 | #endif |
| 110 | promises.erase(it); |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | |
| 115 | void AIOContextPool::notifyProducers(const int num_producers) const |
| 116 | { |
| 117 | if (num_producers == 0) |
| 118 | return; |
| 119 | |
| 120 | if (num_producers > 1) |
| 121 | have_resources.notify_all(); |
| 122 | else |
| 123 | have_resources.notify_one(); |
| 124 | } |
| 125 | |
| 126 | |
| 127 | void AIOContextPool::reportExceptionToAnyProducer() |
| 128 | { |
| 129 | const std::lock_guard lock{mutex}; |
| 130 | |
| 131 | const auto any_promise_it = std::begin(promises); |
| 132 | any_promise_it->second.set_exception(std::current_exception()); |
| 133 | } |
| 134 | |
| 135 | |
| 136 | std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb) |
| 137 | { |
| 138 | std::unique_lock lock{mutex}; |
| 139 | |
| 140 | /// get current id and increment it by one |
| 141 | const auto request_id = next_id; |
| 142 | ++next_id; |
| 143 | |
| 144 | /// create a promise and put request in "queue" |
| 145 | promises.emplace(request_id, std::promise<BytesRead>{}); |
| 146 | /// store id in AIO request for further identification |
| 147 | iocb.aio_data = request_id; |
| 148 | |
| 149 | auto num_requests = 0; |
| 150 | struct iocb * requests[] { &iocb }; |
| 151 | |
| 152 | /// submit a request |
| 153 | while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) |
| 154 | { |
| 155 | if (errno == EAGAIN) |
| 156 | /// wait until at least one event has been completed (or a spurious wakeup) and try again |
| 157 | have_resources.wait(lock); |
| 158 | else if (errno != EINTR) |
| 159 | throwFromErrno("io_submit: Failed to submit a request for asynchronous IO" , ErrorCodes::CANNOT_IO_SUBMIT); |
| 160 | } |
| 161 | |
| 162 | return promises[request_id].get_future(); |
| 163 | } |
| 164 | |
| 165 | AIOContextPool & AIOContextPool::instance() |
| 166 | { |
| 167 | static AIOContextPool instance; |
| 168 | return instance; |
| 169 | } |
| 170 | |
| 171 | } |
| 172 | |
| 173 | #endif |
| 174 | |