1#pragma once
2
3#if defined(__linux__) || defined(__FreeBSD__)
4
5#include <IO/WriteBufferFromFileBase.h>
6#include <IO/WriteBuffer.h>
7#include <IO/BufferWithOwnMemory.h>
8#include <IO/AIO.h>
9#include <Core/Defines.h>
10#include <Common/CurrentMetrics.h>
11
12#include <string>
13#include <unistd.h>
14#include <fcntl.h>
15
16
17namespace CurrentMetrics
18{
19 extern const Metric OpenFileForWrite;
20}
21
22namespace DB
23{
24
25/** Class for asynchronous data writing.
26 */
27class WriteBufferAIO : public WriteBufferFromFileBase
28{
29public:
30 WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
31 char * existing_memory_ = nullptr);
32 ~WriteBufferAIO() override;
33
34 WriteBufferAIO(const WriteBufferAIO &) = delete;
35 WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
36
37 off_t getPositionInFile() override;
38 void sync() override;
39 std::string getFileName() const override { return filename; }
40 int getFD() const override { return fd; }
41
42private:
43 void nextImpl() override;
44 off_t doSeek(off_t off, int whence) override;
45 void doTruncate(off_t length) override;
46
47 /// If there's still data in the buffer, we'll write them.
48 void flush();
49 /// Wait for the end of the current asynchronous task.
50 bool waitForAIOCompletion();
51 /// Prepare an asynchronous request.
52 void prepare();
53 ///
54 void finalize() override;
55
56private:
57 /// Buffer for asynchronous data writes.
58 BufferWithOwnMemory<WriteBuffer> flush_buffer;
59
60 /// Description of the asynchronous write request.
61 iocb request{};
62 iocb * request_ptr{&request};
63
64 AIOContext aio_context{1};
65
66 const std::string filename;
67
68 /// The number of bytes to be written to the disk.
69 off_t bytes_to_write = 0;
70 /// Number of bytes written with the last request.
71 off_t bytes_written = 0;
72 /// The number of zero bytes to be cut from the end of the file
73 /// after the data write operation completes.
74 off_t truncation_count = 0;
75
76 /// The current position in the file.
77 off_t pos_in_file = 0;
78 /// The maximum position reached in the file.
79 off_t max_pos_in_file = 0;
80
81 /// The starting position of the aligned region of the disk to which the data is written.
82 off_t region_aligned_begin = 0;
83 /// The size of the aligned region of the disk.
84 size_t region_aligned_size = 0;
85
86 /// The file descriptor for writing.
87 int fd = -1;
88
89 /// The data buffer that we want to write to the disk.
90 Position buffer_begin = nullptr;
91
92 /// Is the asynchronous write operation still in progress?
93 bool is_pending_write = false;
94 /// Did the asynchronous operation fail?
95 bool aio_failed = false;
96
97 CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
98};
99
100}
101
102#endif
103