1#if defined(__linux__) || defined(__FreeBSD__)
2
3#include <Common/Exception.h>
4#include <common/logger_useful.h>
5#include <Common/MemorySanitizer.h>
6#include <Poco/Logger.h>
7#include <boost/range/iterator_range.hpp>
8#include <errno.h>
9
10#include <IO/AIOContextPool.h>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int CANNOT_IO_SUBMIT;
19 extern const int CANNOT_IO_GETEVENTS;
20}
21
22
23AIOContextPool::~AIOContextPool()
24{
25 cancelled.store(true, std::memory_order_relaxed);
26 io_completion_monitor.join();
27}
28
29
30void AIOContextPool::doMonitor()
31{
32 /// continue checking for events unless cancelled
33 while (!cancelled.load(std::memory_order_relaxed))
34 waitForCompletion();
35
36 /// wait until all requests have been completed
37 while (!promises.empty())
38 waitForCompletion();
39}
40
41
42void AIOContextPool::waitForCompletion()
43{
44 /// array to hold completion events
45 io_event events[max_concurrent_events];
46
47 try
48 {
49 const auto num_events = getCompletionEvents(events, max_concurrent_events);
50 fulfillPromises(events, num_events);
51 notifyProducers(num_events);
52 }
53 catch (...)
54 {
55 /// there was an error, log it, return to any producer and continue
56 reportExceptionToAnyProducer();
57 tryLogCurrentException("AIOContextPool::waitForCompletion()");
58 }
59}
60
61
62int AIOContextPool::getCompletionEvents(io_event events[], const int max_events)
63{
64 timespec timeout{timeout_sec, 0};
65
66 auto num_events = 0;
67
68 /// request 1 to `max_events` events
69 while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
70 if (errno != EINTR)
71 throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
72
73 /// Unpoison the memory returned from a non-instrumented system call.
74 __msan_unpoison(events, sizeof(*events) * num_events);
75
76 return num_events;
77}
78
79
80void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
81{
82 if (num_events == 0)
83 return;
84
85 const std::lock_guard lock{mutex};
86
87 /// look at returned events and find corresponding promise, set result and erase promise from map
88 for (const auto & event : boost::make_iterator_range(events, events + num_events))
89 {
90 /// get id from event
91#if defined(__FreeBSD__)
92 const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data;
93#else
94 const auto completed_id = event.data;
95#endif
96
97 /// set value via promise and release it
98 const auto it = promises.find(completed_id);
99 if (it == std::end(promises))
100 {
101 LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << completed_id);
102 continue;
103 }
104
105#if defined(__FreeBSD__)
106 it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
107#else
108 it->second.set_value(event.res);
109#endif
110 promises.erase(it);
111 }
112}
113
114
115void AIOContextPool::notifyProducers(const int num_producers) const
116{
117 if (num_producers == 0)
118 return;
119
120 if (num_producers > 1)
121 have_resources.notify_all();
122 else
123 have_resources.notify_one();
124}
125
126
127void AIOContextPool::reportExceptionToAnyProducer()
128{
129 const std::lock_guard lock{mutex};
130
131 const auto any_promise_it = std::begin(promises);
132 any_promise_it->second.set_exception(std::current_exception());
133}
134
135
136std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
137{
138 std::unique_lock lock{mutex};
139
140 /// get current id and increment it by one
141 const auto request_id = next_id;
142 ++next_id;
143
144 /// create a promise and put request in "queue"
145 promises.emplace(request_id, std::promise<BytesRead>{});
146 /// store id in AIO request for further identification
147 iocb.aio_data = request_id;
148
149 auto num_requests = 0;
150 struct iocb * requests[] { &iocb };
151
152 /// submit a request
153 while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
154 {
155 if (errno == EAGAIN)
156 /// wait until at least one event has been completed (or a spurious wakeup) and try again
157 have_resources.wait(lock);
158 else if (errno != EINTR)
159 throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
160 }
161
162 return promises[request_id].get_future();
163}
164
165AIOContextPool & AIOContextPool::instance()
166{
167 static AIOContextPool instance;
168 return instance;
169}
170
171}
172
173#endif
174