1 | #if defined(__linux__) || defined(__FreeBSD__) |
2 | |
3 | #include <IO/ReadBufferAIO.h> |
4 | #include <IO/AIOContextPool.h> |
5 | #include <Common/ProfileEvents.h> |
6 | #include <Common/Stopwatch.h> |
7 | #include <Core/Defines.h> |
8 | |
9 | #include <sys/types.h> |
10 | #include <sys/stat.h> |
11 | #include <errno.h> |
12 | |
13 | #include <optional> |
14 | |
15 | |
16 | namespace ProfileEvents |
17 | { |
18 | extern const Event FileOpen; |
19 | extern const Event ReadBufferAIORead; |
20 | extern const Event ReadBufferAIOReadBytes; |
21 | } |
22 | |
23 | namespace CurrentMetrics |
24 | { |
25 | extern const Metric Read; |
26 | } |
27 | |
28 | namespace DB |
29 | { |
30 | |
31 | namespace ErrorCodes |
32 | { |
33 | extern const int FILE_DOESNT_EXIST; |
34 | extern const int CANNOT_OPEN_FILE; |
35 | extern const int LOGICAL_ERROR; |
36 | extern const int ARGUMENT_OUT_OF_BOUND; |
37 | extern const int AIO_READ_ERROR; |
38 | } |
39 | |
40 | |
41 | /// Note: an additional page is allocated that will contain the data that |
42 | /// does not fit into the main buffer. |
43 | ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_) |
44 | : ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), |
45 | fill_buffer(BufferWithOwnMemory<ReadBuffer>(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), |
46 | filename(filename_) |
47 | { |
48 | ProfileEvents::increment(ProfileEvents::FileOpen); |
49 | |
50 | int open_flags = (flags_ == -1) ? O_RDONLY : flags_; |
51 | open_flags |= O_DIRECT; |
52 | |
53 | fd = ::open(filename.c_str(), open_flags); |
54 | if (fd == -1) |
55 | { |
56 | auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE; |
57 | throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code); |
58 | } |
59 | } |
60 | |
61 | ReadBufferAIO::~ReadBufferAIO() |
62 | { |
63 | if (!aio_failed) |
64 | { |
65 | try |
66 | { |
67 | (void) waitForAIOCompletion(); |
68 | } |
69 | catch (...) |
70 | { |
71 | tryLogCurrentException(__PRETTY_FUNCTION__); |
72 | } |
73 | } |
74 | |
75 | if (fd != -1) |
76 | ::close(fd); |
77 | } |
78 | |
79 | void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_) |
80 | { |
81 | if (is_started) |
82 | throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR); |
83 | max_bytes_read = max_bytes_read_; |
84 | } |
85 | |
86 | bool ReadBufferAIO::nextImpl() |
87 | { |
88 | /// If the end of the file has already been reached by calling this function, |
89 | /// then the current call is wrong. |
90 | if (is_eof) |
91 | return false; |
92 | |
93 | std::optional<Stopwatch> watch; |
94 | if (profile_callback) |
95 | watch.emplace(clock_type); |
96 | |
97 | if (!is_aio) |
98 | { |
99 | synchronousRead(); |
100 | is_aio = true; |
101 | } |
102 | else |
103 | receive(); |
104 | |
105 | if (profile_callback) |
106 | { |
107 | ProfileInfo info; |
108 | info.bytes_requested = requested_byte_count; |
109 | info.bytes_read = bytes_read; |
110 | info.nanoseconds = watch->elapsed(); |
111 | profile_callback(info); |
112 | } |
113 | |
114 | is_started = true; |
115 | |
116 | /// If the end of the file is just reached, do nothing else. |
117 | if (is_eof) |
118 | return bytes_read != 0; |
119 | |
120 | /// Create an asynchronous request. |
121 | prepare(); |
122 | |
123 | #if defined(__FreeBSD__) |
124 | request.aio.aio_lio_opcode = LIO_READ; |
125 | request.aio.aio_fildes = fd; |
126 | request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin); |
127 | request.aio.aio_nbytes = region_aligned_size; |
128 | request.aio.aio_offset = region_aligned_begin; |
129 | #else |
130 | request.aio_lio_opcode = IOCB_CMD_PREAD; |
131 | request.aio_fildes = fd; |
132 | request.aio_buf = reinterpret_cast<UInt64>(buffer_begin); |
133 | request.aio_nbytes = region_aligned_size; |
134 | request.aio_offset = region_aligned_begin; |
135 | #endif |
136 | |
137 | /// Send the request. |
138 | try |
139 | { |
140 | future_bytes_read = AIOContextPool::instance().post(request); |
141 | } |
142 | catch (...) |
143 | { |
144 | aio_failed = true; |
145 | throw; |
146 | } |
147 | |
148 | is_pending_read = true; |
149 | return true; |
150 | } |
151 | |
152 | off_t ReadBufferAIO::doSeek(off_t off, int whence) |
153 | { |
154 | off_t new_pos_in_file; |
155 | |
156 | if (whence == SEEK_SET) |
157 | { |
158 | if (off < 0) |
159 | throw Exception("SEEK_SET underflow" , ErrorCodes::ARGUMENT_OUT_OF_BOUND); |
160 | new_pos_in_file = off; |
161 | } |
162 | else if (whence == SEEK_CUR) |
163 | { |
164 | if (off >= 0) |
165 | { |
166 | if (off > (std::numeric_limits<off_t>::max() - getPositionInFile())) |
167 | throw Exception("SEEK_CUR overflow" , ErrorCodes::ARGUMENT_OUT_OF_BOUND); |
168 | } |
169 | else if (off < -getPositionInFile()) |
170 | throw Exception("SEEK_CUR underflow" , ErrorCodes::ARGUMENT_OUT_OF_BOUND); |
171 | new_pos_in_file = getPositionInFile() + off; |
172 | } |
173 | else |
174 | throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence" , ErrorCodes::ARGUMENT_OUT_OF_BOUND); |
175 | |
176 | if (new_pos_in_file != getPositionInFile()) |
177 | { |
178 | off_t first_read_pos_in_file = first_unread_pos_in_file - static_cast<off_t>(working_buffer.size()); |
179 | if (hasPendingData() && (new_pos_in_file >= first_read_pos_in_file) && (new_pos_in_file <= first_unread_pos_in_file)) |
180 | { |
181 | /// Moved, but remained within the buffer. |
182 | pos = working_buffer.begin() + (new_pos_in_file - first_read_pos_in_file); |
183 | } |
184 | else |
185 | { |
186 | /// Moved past the buffer. |
187 | pos = working_buffer.end(); |
188 | first_unread_pos_in_file = new_pos_in_file; |
189 | |
190 | /// If we go back, than it's not eof |
191 | is_eof = false; |
192 | |
193 | /// We can not use the result of the current asynchronous request. |
194 | skip(); |
195 | } |
196 | } |
197 | |
198 | return new_pos_in_file; |
199 | } |
200 | |
201 | void ReadBufferAIO::synchronousRead() |
202 | { |
203 | CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read}; |
204 | |
205 | prepare(); |
206 | bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin); |
207 | |
208 | ProfileEvents::increment(ProfileEvents::ReadBufferAIORead); |
209 | ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read); |
210 | |
211 | finalize(); |
212 | } |
213 | |
214 | void ReadBufferAIO::receive() |
215 | { |
216 | if (!waitForAIOCompletion()) |
217 | return; |
218 | finalize(); |
219 | } |
220 | |
221 | void ReadBufferAIO::skip() |
222 | { |
223 | if (!waitForAIOCompletion()) |
224 | return; |
225 | |
226 | is_aio = false; |
227 | |
228 | /// @todo I presume this assignment is redundant since waitForAIOCompletion() performs a similar one |
229 | // bytes_read = future_bytes_read.get(); |
230 | if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding)) |
231 | throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); |
232 | } |
233 | |
234 | bool ReadBufferAIO::waitForAIOCompletion() |
235 | { |
236 | if (is_eof || !is_pending_read) |
237 | return false; |
238 | |
239 | CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read}; |
240 | |
241 | bytes_read = future_bytes_read.get(); |
242 | is_pending_read = false; |
243 | |
244 | ProfileEvents::increment(ProfileEvents::ReadBufferAIORead); |
245 | ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read); |
246 | |
247 | return true; |
248 | } |
249 | |
250 | void ReadBufferAIO::prepare() |
251 | { |
252 | requested_byte_count = std::min(fill_buffer.internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE, max_bytes_read); |
253 | |
254 | /// Region of the disk from which we want to read data. |
255 | const off_t region_begin = first_unread_pos_in_file; |
256 | |
257 | if ((requested_byte_count > static_cast<size_t>(std::numeric_limits<off_t>::max())) || |
258 | (first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - static_cast<off_t>(requested_byte_count)))) |
259 | throw Exception("An overflow occurred during file operation" , ErrorCodes::LOGICAL_ERROR); |
260 | |
261 | const off_t region_end = first_unread_pos_in_file + requested_byte_count; |
262 | |
263 | /// The aligned region of the disk from which we will read the data. |
264 | region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE; |
265 | const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE; |
266 | |
267 | region_aligned_begin = region_begin - region_left_padding; |
268 | |
269 | if (region_end > (std::numeric_limits<off_t>::max() - static_cast<off_t>(region_right_padding))) |
270 | throw Exception("An overflow occurred during file operation" , ErrorCodes::LOGICAL_ERROR); |
271 | |
272 | const off_t region_aligned_end = region_end + region_right_padding; |
273 | region_aligned_size = region_aligned_end - region_aligned_begin; |
274 | |
275 | buffer_begin = fill_buffer.internalBuffer().begin(); |
276 | } |
277 | |
278 | void ReadBufferAIO::finalize() |
279 | { |
280 | if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding)) |
281 | throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); |
282 | |
283 | /// Ignore redundant bytes on the left. |
284 | bytes_read -= region_left_padding; |
285 | |
286 | /// Ignore redundant bytes on the right. |
287 | bytes_read = std::min(static_cast<off_t>(bytes_read), static_cast<off_t>(requested_byte_count)); |
288 | |
289 | if (bytes_read > 0) |
290 | fill_buffer.buffer().resize(region_left_padding + bytes_read); |
291 | if (static_cast<size_t>(bytes_read) < requested_byte_count) |
292 | is_eof = true; |
293 | |
294 | if (first_unread_pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read)) |
295 | throw Exception("An overflow occurred during file operation" , ErrorCodes::LOGICAL_ERROR); |
296 | |
297 | first_unread_pos_in_file += bytes_read; |
298 | total_bytes_read += bytes_read; |
299 | working_buffer_offset = region_left_padding; |
300 | |
301 | if (total_bytes_read == max_bytes_read) |
302 | is_eof = true; |
303 | |
304 | /// Swap the main and duplicate buffers. |
305 | swap(fill_buffer); |
306 | } |
307 | |
308 | } |
309 | |
310 | #endif |
311 | |