| 1 | #if defined(__linux__) || defined(__FreeBSD__) | 
|---|
| 2 |  | 
|---|
| 3 | #include <Common/Exception.h> | 
|---|
| 4 | #include <common/logger_useful.h> | 
|---|
| 5 | #include <Common/MemorySanitizer.h> | 
|---|
| 6 | #include <Poco/Logger.h> | 
|---|
| 7 | #include <boost/range/iterator_range.hpp> | 
|---|
| 8 | #include <errno.h> | 
|---|
| 9 |  | 
|---|
| 10 | #include <IO/AIOContextPool.h> | 
|---|
| 11 |  | 
|---|
| 12 |  | 
|---|
| 13 | namespace DB | 
|---|
| 14 | { | 
|---|
| 15 |  | 
|---|
| 16 | namespace ErrorCodes | 
|---|
| 17 | { | 
|---|
| 18 | extern const int CANNOT_IO_SUBMIT; | 
|---|
| 19 | extern const int CANNOT_IO_GETEVENTS; | 
|---|
| 20 | } | 
|---|
| 21 |  | 
|---|
| 22 |  | 
|---|
| 23 | AIOContextPool::~AIOContextPool() | 
|---|
| 24 | { | 
|---|
| 25 | cancelled.store(true, std::memory_order_relaxed); | 
|---|
| 26 | io_completion_monitor.join(); | 
|---|
| 27 | } | 
|---|
| 28 |  | 
|---|
| 29 |  | 
|---|
| 30 | void AIOContextPool::doMonitor() | 
|---|
| 31 | { | 
|---|
| 32 | /// continue checking for events unless cancelled | 
|---|
| 33 | while (!cancelled.load(std::memory_order_relaxed)) | 
|---|
| 34 | waitForCompletion(); | 
|---|
| 35 |  | 
|---|
| 36 | /// wait until all requests have been completed | 
|---|
| 37 | while (!promises.empty()) | 
|---|
| 38 | waitForCompletion(); | 
|---|
| 39 | } | 
|---|
| 40 |  | 
|---|
| 41 |  | 
|---|
| 42 | void AIOContextPool::waitForCompletion() | 
|---|
| 43 | { | 
|---|
| 44 | /// array to hold completion events | 
|---|
| 45 | io_event events[max_concurrent_events]; | 
|---|
| 46 |  | 
|---|
| 47 | try | 
|---|
| 48 | { | 
|---|
| 49 | const auto num_events = getCompletionEvents(events, max_concurrent_events); | 
|---|
| 50 | fulfillPromises(events, num_events); | 
|---|
| 51 | notifyProducers(num_events); | 
|---|
| 52 | } | 
|---|
| 53 | catch (...) | 
|---|
| 54 | { | 
|---|
| 55 | /// there was an error, log it, return to any producer and continue | 
|---|
| 56 | reportExceptionToAnyProducer(); | 
|---|
| 57 | tryLogCurrentException( "AIOContextPool::waitForCompletion()"); | 
|---|
| 58 | } | 
|---|
| 59 | } | 
|---|
| 60 |  | 
|---|
| 61 |  | 
|---|
| 62 | int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) | 
|---|
| 63 | { | 
|---|
| 64 | timespec timeout{timeout_sec, 0}; | 
|---|
| 65 |  | 
|---|
| 66 | auto num_events = 0; | 
|---|
| 67 |  | 
|---|
| 68 | /// request 1 to `max_events` events | 
|---|
| 69 | while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) | 
|---|
| 70 | if (errno != EINTR) | 
|---|
| 71 | throwFromErrno( "io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno); | 
|---|
| 72 |  | 
|---|
| 73 | /// Unpoison the memory returned from a non-instrumented system call. | 
|---|
| 74 | __msan_unpoison(events, sizeof(*events) * num_events); | 
|---|
| 75 |  | 
|---|
| 76 | return num_events; | 
|---|
| 77 | } | 
|---|
| 78 |  | 
|---|
| 79 |  | 
|---|
| 80 | void AIOContextPool::fulfillPromises(const io_event events[], const int num_events) | 
|---|
| 81 | { | 
|---|
| 82 | if (num_events == 0) | 
|---|
| 83 | return; | 
|---|
| 84 |  | 
|---|
| 85 | const std::lock_guard lock{mutex}; | 
|---|
| 86 |  | 
|---|
| 87 | /// look at returned events and find corresponding promise, set result and erase promise from map | 
|---|
| 88 | for (const auto & event : boost::make_iterator_range(events, events + num_events)) | 
|---|
| 89 | { | 
|---|
| 90 | /// get id from event | 
|---|
| 91 | #if defined(__FreeBSD__) | 
|---|
| 92 | const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data; | 
|---|
| 93 | #else | 
|---|
| 94 | const auto completed_id = event.data; | 
|---|
| 95 | #endif | 
|---|
| 96 |  | 
|---|
| 97 | /// set value via promise and release it | 
|---|
| 98 | const auto it = promises.find(completed_id); | 
|---|
| 99 | if (it == std::end(promises)) | 
|---|
| 100 | { | 
|---|
| 101 | LOG_ERROR(&Poco::Logger::get( "AIOcontextPool"), "Found io_event with unknown id "<< completed_id); | 
|---|
| 102 | continue; | 
|---|
| 103 | } | 
|---|
| 104 |  | 
|---|
| 105 | #if defined(__FreeBSD__) | 
|---|
| 106 | it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata))); | 
|---|
| 107 | #else | 
|---|
| 108 | it->second.set_value(event.res); | 
|---|
| 109 | #endif | 
|---|
| 110 | promises.erase(it); | 
|---|
| 111 | } | 
|---|
| 112 | } | 
|---|
| 113 |  | 
|---|
| 114 |  | 
|---|
| 115 | void AIOContextPool::notifyProducers(const int num_producers) const | 
|---|
| 116 | { | 
|---|
| 117 | if (num_producers == 0) | 
|---|
| 118 | return; | 
|---|
| 119 |  | 
|---|
| 120 | if (num_producers > 1) | 
|---|
| 121 | have_resources.notify_all(); | 
|---|
| 122 | else | 
|---|
| 123 | have_resources.notify_one(); | 
|---|
| 124 | } | 
|---|
| 125 |  | 
|---|
| 126 |  | 
|---|
| 127 | void AIOContextPool::reportExceptionToAnyProducer() | 
|---|
| 128 | { | 
|---|
| 129 | const std::lock_guard lock{mutex}; | 
|---|
| 130 |  | 
|---|
| 131 | const auto any_promise_it = std::begin(promises); | 
|---|
| 132 | any_promise_it->second.set_exception(std::current_exception()); | 
|---|
| 133 | } | 
|---|
| 134 |  | 
|---|
| 135 |  | 
|---|
| 136 | std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb) | 
|---|
| 137 | { | 
|---|
| 138 | std::unique_lock lock{mutex}; | 
|---|
| 139 |  | 
|---|
| 140 | /// get current id and increment it by one | 
|---|
| 141 | const auto request_id = next_id; | 
|---|
| 142 | ++next_id; | 
|---|
| 143 |  | 
|---|
| 144 | /// create a promise and put request in "queue" | 
|---|
| 145 | promises.emplace(request_id, std::promise<BytesRead>{}); | 
|---|
| 146 | /// store id in AIO request for further identification | 
|---|
| 147 | iocb.aio_data = request_id; | 
|---|
| 148 |  | 
|---|
| 149 | auto num_requests = 0; | 
|---|
| 150 | struct iocb * requests[] { &iocb }; | 
|---|
| 151 |  | 
|---|
| 152 | /// submit a request | 
|---|
| 153 | while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) | 
|---|
| 154 | { | 
|---|
| 155 | if (errno == EAGAIN) | 
|---|
| 156 | /// wait until at least one event has been completed (or a spurious wakeup) and try again | 
|---|
| 157 | have_resources.wait(lock); | 
|---|
| 158 | else if (errno != EINTR) | 
|---|
| 159 | throwFromErrno( "io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT); | 
|---|
| 160 | } | 
|---|
| 161 |  | 
|---|
| 162 | return promises[request_id].get_future(); | 
|---|
| 163 | } | 
|---|
| 164 |  | 
|---|
| 165 | AIOContextPool & AIOContextPool::instance() | 
|---|
| 166 | { | 
|---|
| 167 | static AIOContextPool instance; | 
|---|
| 168 | return instance; | 
|---|
| 169 | } | 
|---|
| 170 |  | 
|---|
| 171 | } | 
|---|
| 172 |  | 
|---|
| 173 | #endif | 
|---|
| 174 |  | 
|---|