1#if defined(__linux__) || defined(__FreeBSD__)
2
3#include <IO/WriteBufferAIO.h>
4#include <Common/MemorySanitizer.h>
5#include <Common/ProfileEvents.h>
6
7#include <limits>
8#include <sys/types.h>
9#include <sys/stat.h>
10#include <errno.h>
11
12
13namespace ProfileEvents
14{
15 extern const Event FileOpen;
16 extern const Event WriteBufferAIOWrite;
17 extern const Event WriteBufferAIOWriteBytes;
18}
19
20namespace CurrentMetrics
21{
22 extern const Metric Write;
23}
24
25namespace DB
26{
27
28namespace ErrorCodes
29{
30 extern const int FILE_DOESNT_EXIST;
31 extern const int CANNOT_OPEN_FILE;
32 extern const int LOGICAL_ERROR;
33 extern const int ARGUMENT_OUT_OF_BOUND;
34 extern const int AIO_READ_ERROR;
35 extern const int AIO_WRITE_ERROR;
36 extern const int CANNOT_IO_SUBMIT;
37 extern const int CANNOT_IO_GETEVENTS;
38 extern const int CANNOT_TRUNCATE_FILE;
39 extern const int CANNOT_FSYNC;
40}
41
42
43/// Note: an additional page is allocated that will contain data that
44/// do not fit into the main buffer.
45WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
46 char * existing_memory_)
47 : WriteBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
48 flush_buffer(BufferWithOwnMemory<WriteBuffer>(this->memory.size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
49 filename(filename_)
50{
51 ProfileEvents::increment(ProfileEvents::FileOpen);
52
53 /// Correct the buffer size information so that additional pages do not touch the base class `BufferBase`.
54 this->buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
55 this->internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
56 flush_buffer.buffer().resize(this->buffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
57 flush_buffer.internalBuffer().resize(this->internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE);
58
59 int open_flags = (flags_ == -1) ? (O_RDWR | O_TRUNC | O_CREAT) : flags_;
60 open_flags |= O_DIRECT;
61
62 fd = ::open(filename.c_str(), open_flags, mode_);
63 if (fd == -1)
64 {
65 auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
66 throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code);
67 }
68}
69
70WriteBufferAIO::~WriteBufferAIO()
71{
72 if (!aio_failed)
73 {
74 try
75 {
76 flush();
77 }
78 catch (...)
79 {
80 tryLogCurrentException(__PRETTY_FUNCTION__);
81 }
82 }
83
84 if (fd != -1)
85 ::close(fd);
86}
87
88off_t WriteBufferAIO::getPositionInFile()
89{
90 return seek(0, SEEK_CUR);
91}
92
93void WriteBufferAIO::sync()
94{
95 flush();
96
97 /// Ask OS to flush data to disk.
98 int res = ::fsync(fd);
99 if (res == -1)
100 throwFromErrnoWithPath("Cannot fsync " + getFileName(), getFileName(), ErrorCodes::CANNOT_FSYNC);
101}
102
103void WriteBufferAIO::nextImpl()
104{
105 if (!offset())
106 return;
107
108 if (waitForAIOCompletion())
109 finalize();
110
111 /// Create a request for asynchronous write.
112 prepare();
113
114#if defined(__FreeBSD__)
115 request.aio.aio_lio_opcode = LIO_WRITE;
116 request.aio.aio_fildes = fd;
117 request.aio.aio_buf = reinterpret_cast<volatile void *>(buffer_begin);
118 request.aio.aio_nbytes = region_aligned_size;
119 request.aio.aio_offset = region_aligned_begin;
120#else
121 request.aio_lio_opcode = IOCB_CMD_PWRITE;
122 request.aio_fildes = fd;
123 request.aio_buf = reinterpret_cast<UInt64>(buffer_begin);
124 request.aio_nbytes = region_aligned_size;
125 request.aio_offset = region_aligned_begin;
126#endif
127
128 /// Send the request.
129 while (io_submit(aio_context.ctx, 1, &request_ptr) < 0)
130 {
131 if (errno != EINTR)
132 {
133 aio_failed = true;
134 throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::CANNOT_IO_SUBMIT);
135 }
136 }
137
138 is_pending_write = true;
139}
140
141off_t WriteBufferAIO::doSeek(off_t off, int whence)
142{
143 flush();
144
145 if (whence == SEEK_SET)
146 {
147 if (off < 0)
148 throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
149 pos_in_file = off;
150 }
151 else if (whence == SEEK_CUR)
152 {
153 if (off >= 0)
154 {
155 if (off > (std::numeric_limits<off_t>::max() - pos_in_file))
156 throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
157 }
158 else if (off < -pos_in_file)
159 throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
160 pos_in_file += off;
161 }
162 else
163 throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
164
165 if (pos_in_file > max_pos_in_file)
166 max_pos_in_file = pos_in_file;
167
168 return pos_in_file;
169}
170
171void WriteBufferAIO::doTruncate(off_t length)
172{
173 flush();
174
175 int res = ::ftruncate(fd, length);
176 if (res == -1)
177 throwFromErrnoWithPath("Cannot truncate file " + filename, filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
178}
179
180void WriteBufferAIO::flush()
181{
182 next();
183 if (waitForAIOCompletion())
184 finalize();
185}
186
187bool WriteBufferAIO::waitForAIOCompletion()
188{
189 if (!is_pending_write)
190 return false;
191
192 CurrentMetrics::Increment metric_increment_write{CurrentMetrics::Write};
193
194 io_event event;
195 while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) < 0)
196 {
197 if (errno != EINTR)
198 {
199 aio_failed = true;
200 throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::CANNOT_IO_GETEVENTS);
201 }
202 }
203
204 // Unpoison the memory returned from an uninstrumented system function.
205 __msan_unpoison(&event, sizeof(event));
206
207 is_pending_write = false;
208#if defined(__FreeBSD__)
209 bytes_written = aio_return(reinterpret_cast<struct aiocb *>(event.udata));
210#else
211 bytes_written = event.res;
212#endif
213
214 ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
215 ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
216
217 return true;
218}
219
220void WriteBufferAIO::prepare()
221{
222 /// Swap the main and duplicate buffers.
223 swap(flush_buffer);
224
225 truncation_count = 0;
226
227 /*
228 A page on disk or in memory
229
230 start address (starting position in case of disk) is a multiply of DEFAULT_AIO_FILE_BLOCK_SIZE
231 :
232 :
233 +---------------+
234 | |
235 | |
236 | |
237 | |
238 | |
239 | |
240 +---------------+
241 <--------------->
242 :
243 :
244 DEFAULT_AIO_FILE_BLOCK_SIZE
245
246 */
247
248 /*
249 Representation of data on a disk
250
251 XXX : the data you want to write
252 ZZZ : data that is already on disk or zeros, if there is no data
253
254 region_aligned_begin region_aligned_end
255 : region_begin region_end :
256 : : : :
257 : : : :
258 +---:-----------+---------------+---------------+---------------+--:------------+
259 | : | | | | : |
260 | +-----------+---------------+---------------+---------------+--+ |
261 |ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
262 |ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
263 | +-----------+---------------+---------------+---------------+--+ |
264 | | | | | |
265 +---------------+---------------+---------------+---------------+---------------+
266
267 <--><--------------------------------------------------------------><----------->
268 : : :
269 : : :
270 region_left_padding region_size region_right_padding
271
272 <------------------------------------------------------------------------------->
273 :
274 :
275 region_aligned_size
276 */
277
278 /// Region of the disk in which we want to write data.
279 const off_t region_begin = pos_in_file;
280
281 if ((flush_buffer.offset() > static_cast<size_t>(std::numeric_limits<off_t>::max())) ||
282 (pos_in_file > (std::numeric_limits<off_t>::max() - static_cast<off_t>(flush_buffer.offset()))))
283 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
284
285 const off_t region_end = pos_in_file + flush_buffer.offset();
286 const size_t region_size = region_end - region_begin;
287
288 /// The aligned region of the disk into which we want to write the data.
289 const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE;
290 const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE;
291
292 region_aligned_begin = region_begin - region_left_padding;
293
294 if (region_end > (std::numeric_limits<off_t>::max() - static_cast<off_t>(region_right_padding)))
295 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
296
297 const off_t region_aligned_end = region_end + region_right_padding;
298 region_aligned_size = region_aligned_end - region_aligned_begin;
299
300 bytes_to_write = region_aligned_size;
301
302 /*
303 Representing data in the buffer before processing
304
305 XXX : the data you want to write
306
307 buffer_begin buffer_end
308 : :
309 : :
310 +---------------+---------------+---------------+-------------:-+
311 | | | | : |
312 +---------------+---------------+---------------+-------------+ |
313 |XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
314 |XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXX| |
315 +---------------+---------------+---------------+-------------+ |
316 | | | | |
317 +---------------+---------------+---------------+---------------+
318
319 <------------------------------------------------------------->
320 :
321 :
322 buffer_size
323 */
324
325 /// The buffer of data that we want to write to the disk.
326 buffer_begin = flush_buffer.buffer().begin();
327 Position buffer_end = buffer_begin + region_size;
328 size_t buffer_size = buffer_end - buffer_begin;
329
330 /// Process the buffer so that it reflects the structure of the disk region.
331
332 /*
333 Representation of data in the buffer after processing
334
335 XXX : the data you want to write
336 ZZZ : data from disk or zeros, if there is no data
337
338 `buffer_begin` `buffer_end` extra page
339 : : :
340 : : :
341 +---:-----------+---------------+---------------+---------------+--:------------+
342 | | | | | : |
343 | +-----------+---------------+---------------+---------------+--+ |
344 |ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
345 |ZZZ|XXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XXXXXXXXXXXXXXX|XX|ZZZZZZZZZZZZ|
346 | +-----------+---------------+---------------+---------------+--+ |
347 | | | | | |
348 +---------------+---------------+---------------+---------------+---------------+
349
350 <--><--------------------------------------------------------------><----------->
351 : : :
352 : : :
353 region_left_padding region_size region_right_padding
354
355 <------------------------------------------------------------------------------->
356 :
357 :
358 region_aligned_size
359 */
360
361 if ((region_left_padding > 0) || (region_right_padding > 0))
362 {
363 char memory_page[DEFAULT_AIO_FILE_BLOCK_SIZE] __attribute__ ((aligned (DEFAULT_AIO_FILE_BLOCK_SIZE)));
364
365 if (region_left_padding > 0)
366 {
367 /// Move the buffer data to the right. Complete the beginning of the buffer with data from the disk.
368 buffer_size += region_left_padding;
369 buffer_end = buffer_begin + buffer_size;
370
371 ::memmove(buffer_begin + region_left_padding, buffer_begin, (buffer_size - region_left_padding) * sizeof(*buffer_begin));
372
373 ssize_t read_count = ::pread(fd, memory_page, DEFAULT_AIO_FILE_BLOCK_SIZE, region_aligned_begin);
374 if (read_count < 0)
375 throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
376
377 size_t to_copy = std::min(static_cast<size_t>(read_count), region_left_padding);
378 ::memcpy(buffer_begin, memory_page, to_copy * sizeof(*buffer_begin));
379 ::memset(buffer_begin + to_copy, 0, (region_left_padding - to_copy) * sizeof(*buffer_begin));
380 }
381
382 if (region_right_padding > 0)
383 {
384 /// Add the end of the buffer with data from the disk.
385 ssize_t read_count = ::pread(fd, memory_page, DEFAULT_AIO_FILE_BLOCK_SIZE, region_aligned_end - DEFAULT_AIO_FILE_BLOCK_SIZE);
386 if (read_count < 0)
387 throw Exception("Read error", ErrorCodes::AIO_READ_ERROR);
388
389 Position truncation_begin;
390 off_t offset = DEFAULT_AIO_FILE_BLOCK_SIZE - region_right_padding;
391 if (read_count > offset)
392 {
393 ::memcpy(buffer_end, memory_page + offset, (read_count - offset) * sizeof(*buffer_end));
394 truncation_begin = buffer_end + (read_count - offset);
395 truncation_count = DEFAULT_AIO_FILE_BLOCK_SIZE - read_count;
396 }
397 else
398 {
399 truncation_begin = buffer_end;
400 truncation_count = region_right_padding;
401 }
402
403 ::memset(truncation_begin, 0, truncation_count * sizeof(*truncation_begin));
404 }
405 }
406}
407
408void WriteBufferAIO::finalize()
409{
410 if (bytes_written < bytes_to_write)
411 throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR);
412
413 bytes_written -= truncation_count;
414
415#if defined(__FreeBSD__)
416 off_t aio_offset = request.aio.aio_offset;
417#else
418 off_t aio_offset = request.aio_offset;
419#endif
420 off_t pos_offset = bytes_written - (pos_in_file - aio_offset);
421
422 if (pos_in_file > (std::numeric_limits<off_t>::max() - pos_offset))
423 throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR);
424 pos_in_file += pos_offset;
425
426 if (pos_in_file > max_pos_in_file)
427 max_pos_in_file = pos_in_file;
428
429 if (truncation_count > 0)
430 {
431 /// Truncate the file to remove unnecessary zeros from it.
432 int res = ::ftruncate(fd, max_pos_in_file);
433 if (res == -1)
434 throwFromErrnoWithPath("Cannot truncate file " + filename, filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
435 }
436}
437
438}
439
440#endif
441