1 | #if defined(__linux__) || defined(__FreeBSD__) |
2 | |
3 | #include <Common/Exception.h> |
4 | #include <common/logger_useful.h> |
5 | #include <Common/MemorySanitizer.h> |
6 | #include <Poco/Logger.h> |
7 | #include <boost/range/iterator_range.hpp> |
8 | #include <errno.h> |
9 | |
10 | #include <IO/AIOContextPool.h> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | namespace ErrorCodes |
17 | { |
18 | extern const int CANNOT_IO_SUBMIT; |
19 | extern const int CANNOT_IO_GETEVENTS; |
20 | } |
21 | |
22 | |
23 | AIOContextPool::~AIOContextPool() |
24 | { |
25 | cancelled.store(true, std::memory_order_relaxed); |
26 | io_completion_monitor.join(); |
27 | } |
28 | |
29 | |
30 | void AIOContextPool::doMonitor() |
31 | { |
32 | /// continue checking for events unless cancelled |
33 | while (!cancelled.load(std::memory_order_relaxed)) |
34 | waitForCompletion(); |
35 | |
36 | /// wait until all requests have been completed |
37 | while (!promises.empty()) |
38 | waitForCompletion(); |
39 | } |
40 | |
41 | |
42 | void AIOContextPool::waitForCompletion() |
43 | { |
44 | /// array to hold completion events |
45 | io_event events[max_concurrent_events]; |
46 | |
47 | try |
48 | { |
49 | const auto num_events = getCompletionEvents(events, max_concurrent_events); |
50 | fulfillPromises(events, num_events); |
51 | notifyProducers(num_events); |
52 | } |
53 | catch (...) |
54 | { |
55 | /// there was an error, log it, return to any producer and continue |
56 | reportExceptionToAnyProducer(); |
57 | tryLogCurrentException("AIOContextPool::waitForCompletion()" ); |
58 | } |
59 | } |
60 | |
61 | |
62 | int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) |
63 | { |
64 | timespec timeout{timeout_sec, 0}; |
65 | |
66 | auto num_events = 0; |
67 | |
68 | /// request 1 to `max_events` events |
69 | while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0) |
70 | if (errno != EINTR) |
71 | throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion" , ErrorCodes::CANNOT_IO_GETEVENTS, errno); |
72 | |
73 | /// Unpoison the memory returned from a non-instrumented system call. |
74 | __msan_unpoison(events, sizeof(*events) * num_events); |
75 | |
76 | return num_events; |
77 | } |
78 | |
79 | |
80 | void AIOContextPool::fulfillPromises(const io_event events[], const int num_events) |
81 | { |
82 | if (num_events == 0) |
83 | return; |
84 | |
85 | const std::lock_guard lock{mutex}; |
86 | |
87 | /// look at returned events and find corresponding promise, set result and erase promise from map |
88 | for (const auto & event : boost::make_iterator_range(events, events + num_events)) |
89 | { |
90 | /// get id from event |
91 | #if defined(__FreeBSD__) |
92 | const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data; |
93 | #else |
94 | const auto completed_id = event.data; |
95 | #endif |
96 | |
97 | /// set value via promise and release it |
98 | const auto it = promises.find(completed_id); |
99 | if (it == std::end(promises)) |
100 | { |
101 | LOG_ERROR(&Poco::Logger::get("AIOcontextPool" ), "Found io_event with unknown id " << completed_id); |
102 | continue; |
103 | } |
104 | |
105 | #if defined(__FreeBSD__) |
106 | it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata))); |
107 | #else |
108 | it->second.set_value(event.res); |
109 | #endif |
110 | promises.erase(it); |
111 | } |
112 | } |
113 | |
114 | |
115 | void AIOContextPool::notifyProducers(const int num_producers) const |
116 | { |
117 | if (num_producers == 0) |
118 | return; |
119 | |
120 | if (num_producers > 1) |
121 | have_resources.notify_all(); |
122 | else |
123 | have_resources.notify_one(); |
124 | } |
125 | |
126 | |
127 | void AIOContextPool::reportExceptionToAnyProducer() |
128 | { |
129 | const std::lock_guard lock{mutex}; |
130 | |
131 | const auto any_promise_it = std::begin(promises); |
132 | any_promise_it->second.set_exception(std::current_exception()); |
133 | } |
134 | |
135 | |
136 | std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb) |
137 | { |
138 | std::unique_lock lock{mutex}; |
139 | |
140 | /// get current id and increment it by one |
141 | const auto request_id = next_id; |
142 | ++next_id; |
143 | |
144 | /// create a promise and put request in "queue" |
145 | promises.emplace(request_id, std::promise<BytesRead>{}); |
146 | /// store id in AIO request for further identification |
147 | iocb.aio_data = request_id; |
148 | |
149 | auto num_requests = 0; |
150 | struct iocb * requests[] { &iocb }; |
151 | |
152 | /// submit a request |
153 | while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0) |
154 | { |
155 | if (errno == EAGAIN) |
156 | /// wait until at least one event has been completed (or a spurious wakeup) and try again |
157 | have_resources.wait(lock); |
158 | else if (errno != EINTR) |
159 | throwFromErrno("io_submit: Failed to submit a request for asynchronous IO" , ErrorCodes::CANNOT_IO_SUBMIT); |
160 | } |
161 | |
162 | return promises[request_id].get_future(); |
163 | } |
164 | |
165 | AIOContextPool & AIOContextPool::instance() |
166 | { |
167 | static AIOContextPool instance; |
168 | return instance; |
169 | } |
170 | |
171 | } |
172 | |
173 | #endif |
174 | |