1#if defined(__linux__) || defined(__FreeBSD__)
2
3#include <IO/ReadBufferAIO.h>
4#include <IO/AIOContextPool.h>
5#include <Common/ProfileEvents.h>
6#include <Common/Stopwatch.h>
7#include <Core/Defines.h>
8
9#include <sys/types.h>
10#include <sys/stat.h>
11#include <errno.h>
12
13#include <optional>
14
15
16namespace ProfileEvents
17{
18 extern const Event FileOpen;
19 extern const Event ReadBufferAIORead;
20 extern const Event ReadBufferAIOReadBytes;
21}
22
23namespace CurrentMetrics
24{
25 extern const Metric Read;
26}
27
28namespace DB
29{
30
31namespace ErrorCodes
32{
33 extern const int FILE_DOESNT_EXIST;
34 extern const int CANNOT_OPEN_FILE;
35 extern const int LOGICAL_ERROR;
36 extern const int ARGUMENT_OUT_OF_BOUND;
37 extern const int AIO_READ_ERROR;
38}
39
40
41/// Note: an additional page is allocated that will contain the data that
42/// does not fit into the main buffer.
43ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_)
44 : ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
45 fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
46 filename(filename_)
47{
48 ProfileEvents::increment(ProfileEvents::FileOpen);
49
50 int open_flags = (flags_ == -1) ? O_RDONLY : flags_;
51 open_flags |= O_DIRECT;
52
53 fd = ::open(filename.c_str(), open_flags);
54 if (fd == -1)
55 {
56 auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
57 throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
58 }
59}
60
61ReadBufferAIO::~ReadBufferAIO()
62{
63 if (!aio_failed)
64 {
65 try
66 {
67 (void) waitForAIOCompletion();
68 }
69 catch (...)
70 {
71 tryLogCurrentException(__PRETTY_FUNCTION__);
72 }
73 }
74
75 if (fd != -1)
76 ::close(fd);
77}
78
79void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
80{
81 if (is_started)
82 throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR);
83 max_bytes_read = max_bytes_read_;
84}
85
86bool ReadBufferAIO::nextImpl()
87{
88 /// If the end of the file has already been reached by calling this function,
89 /// then the current call is wrong.
90 if (is_eof)
91 return false;
92
93 std::optional<Stopwatch> watch;
94 if (profile_callback)
95 watch.emplace(clock_type);
96
97 if (!is_aio)
98 {
99 synchronousRead();
100 is_aio = true;
101 }
102 else
103 receive();
104
105 if (profile_callback)
106 {
107 ProfileInfo info;
108 info.bytes_requested = requested_byte_count;
109 info.bytes_read = bytes_read;
110 info.nanoseconds = watch->elapsed();
111 profile_callback(info);
112 }
113
114 is_started = true;
115
116 /// If the end of the file is just reached, do nothing else.
117 if (is_eof)
118 return bytes_read != 0;
119
120 /// Create an asynchronous request.
121 prepare();
122
123#if defined(__FreeBSD__)
124 request.aio.aio_lio_opcode = LIO_READ;
125 request.aio.aio_fildes = fd;
126 request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
127 request.aio.aio_nbytes = region_aligned_size;
128 request.aio.aio_offset = region_aligned_begin;
129#else
130 request.aio_lio_opcode = IOCB_CMD_PREAD;
131 request.aio_fildes = fd;
132 request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
133 request.aio_nbytes = region_aligned_size;
134 request.aio_offset = region_aligned_begin;
135#endif
136
137 /// Send the request.
138 try
139 {
140 future_bytes_read = AIOContextPool::instance().post(request);
141 }
142 catch (...)
143 {
144 aio_failed = true;
145 throw;
146 }
147
148 is_pending_read = true;
149 return true;
150}
151
152off_t ReadBufferAIO::doSeek(off_t off, int whence)
153{
154 off_t new_pos_in_file;
155
156 if (whence == SEEK_SET)
157 {
158 if (off < 0)
159 throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
160 new_pos_in_file = off;
161 }
162 else if (whence == SEEK_CUR)
163 {
164 if (off >= 0)
165 {
166 if (off > (std::numeric_limits<off_t>::max() - getPositionInFile()))
167 throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
168 }
169 else if (off < -getPositionInFile())
170 throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
171 new_pos_in_file = getPositionInFile() + off;
172 }
173 else
174 throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
175
176 if (new_pos_in_file != getPositionInFile())
177 {
178 off_t first_read_pos_in_file = first_unread_pos_in_file - static_cast<off_t>(working_buffer.size());
179 if (hasPendingData() && (new_pos_in_file >= first_read_pos_in_file) && (new_pos_in_file <= first_unread_pos_in_file))
180 {
181 /// Moved, but remained within the buffer.
182 pos = working_buffer.begin() + (new_pos_in_file - first_read_pos_in_file);
183 }
184 else
185 {
186 /// Moved past the buffer.
187 pos = working_buffer.end();
188 first_unread_pos_in_file = new_pos_in_file;
189
190 /// If we go back, than it's not eof
191 is_eof = false;
192
193 /// We can not use the result of the current asynchronous request.
194 skip();
195 }
196 }
197
198 return new_pos_in_file;
199}
200
201void ReadBufferAIO::synchronousRead()
202{
203 CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read};
204
205 prepare();
206 bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin);
207
208 ProfileEvents::increment(ProfileEvents::ReadBufferAIORead);
209 ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read);
210
211 finalize();
212}
213
214void ReadBufferAIO::receive()
215{
216 if (!waitForAIOCompletion())
217 return;
218 finalize();
219}
220
221void ReadBufferAIO::skip()
222{
223 if (!waitForAIOCompletion())
224 return;
225
226 is_aio = false;
227
228 /// @todo I presume this assignment is redundant since waitForAIOCompletion() performs a similar one
229// bytes_read = future_bytes_read.get();
230 if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
231 throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
232}
233
234bool ReadBufferAIO::waitForAIOCompletion()
235{
236 if (is_eof || !is_pending_read)
237 return false;
238
239 CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read};
240
241 bytes_read = future_bytes_read.get();
242 is_pending_read = false;
243
244 ProfileEvents::increment(ProfileEvents::ReadBufferAIORead);
245 ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read);
246
247 return true;
248}
249
250void ReadBufferAIO::prepare()
251{
252 requested_byte_count = std::min(fill_buffer.internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE, max_bytes_read);
253
254 /// Region of the disk from which we want to read data.
255 const off_t region_begin = first_unread_pos_in_file;
256
257 if ((requested_byte_count > static_cast<size_t>(std::numeric_limits<off_t>::max())) ||
258 (first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - static_cast<off_t>(requested_byte_count))))
259 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
260
261 const off_t region_end = first_unread_pos_in_file + requested_byte_count;
262
263 /// The aligned region of the disk from which we will read the data.
264 region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
265 const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
266
267 region_aligned_begin = region_begin - region_left_padding;
268
269 if (region_end > (std::numeric_limits<off_t>::max() - static_cast<off_t>(region_right_padding)))
270 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
271
272 const off_t region_aligned_end = region_end + region_right_padding;
273 region_aligned_size = region_aligned_end - region_aligned_begin;
274
275 buffer_begin = fill_buffer.internalBuffer().begin();
276}
277
278void ReadBufferAIO::finalize()
279{
280 if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
281 throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
282
283 /// Ignore redundant bytes on the left.
284 bytes_read -= region_left_padding;
285
286 /// Ignore redundant bytes on the right.
287 bytes_read = std::min(static_cast<off_t>(bytes_read), static_cast<off_t>(requested_byte_count));
288
289 if (bytes_read > 0)
290 fill_buffer.buffer().resize(region_left_padding + bytes_read);
291 if (static_cast<size_t>(bytes_read) < requested_byte_count)
292 is_eof = true;
293
294 if (first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
295 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
296
297 first_unread_pos_in_file += bytes_read;
298 total_bytes_read += bytes_read;
299 working_buffer_offset = region_left_padding;
300
301 if (total_bytes_read == max_bytes_read)
302 is_eof = true;
303
304 /// Swap the main and duplicate buffers.
305 swap(fill_buffer);
306}
307
308}
309
310#endif
311