1#pragma once
2
3#if defined(__linux__) || defined(__FreeBSD__)
4
5#include <IO/ReadBufferFromFileBase.h>
6#include <IO/ReadBuffer.h>
7#include <IO/BufferWithOwnMemory.h>
8#include <IO/AIO.h>
9#include <Core/Defines.h>
10#include <Common/CurrentMetrics.h>
11#include <string>
12#include <limits>
13#include <future>
14#include <unistd.h>
15#include <fcntl.h>
16
17
18namespace CurrentMetrics
19{
20 extern const Metric OpenFileForRead;
21}
22
23namespace DB
24{
25
26/** Class for asynchronous data reading.
27 */
28class ReadBufferAIO : public ReadBufferFromFileBase
29{
30public:
31 ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1,
32 char * existing_memory_ = nullptr);
33 ~ReadBufferAIO() override;
34
35 ReadBufferAIO(const ReadBufferAIO &) = delete;
36 ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
37
38 void setMaxBytes(size_t max_bytes_read_);
39 off_t getPositionInFile() override { return first_unread_pos_in_file - (working_buffer.end() - pos); }
40 std::string getFileName() const override { return filename; }
41 int getFD() const override { return fd; }
42
43private:
44 ///
45 bool nextImpl() override;
46 ///
47 off_t doSeek(off_t off, int whence) override;
48 /// Synchronously read the data.
49 void synchronousRead();
50 /// Get data from an asynchronous request.
51 void receive();
52 /// Ignore data from an asynchronous request.
53 void skip();
54 /// Wait for the end of the current asynchronous task.
55 bool waitForAIOCompletion();
56 /// Prepare the request.
57 void prepare();
58 /// Prepare for reading a duplicate buffer containing data from
59 /// of the last request.
60 void finalize();
61
62private:
63 /// Buffer for asynchronous data read operations.
64 BufferWithOwnMemory<ReadBuffer> fill_buffer;
65
66 /// Description of the asynchronous read request.
67 iocb request{};
68 std::future<ssize_t> future_bytes_read;
69
70 const std::string filename;
71
72 /// The maximum number of bytes that can be read.
73 size_t max_bytes_read = std::numeric_limits<size_t>::max();
74 /// Number of bytes requested.
75 size_t requested_byte_count = 0;
76 /// The number of bytes read at the last request.
77 ssize_t bytes_read = 0;
78 /// The total number of bytes read.
79 size_t total_bytes_read = 0;
80
81 /// The position of the first unread byte in the file.
82 off_t first_unread_pos_in_file = 0;
83
84 /// The starting position of the aligned region of the disk from which the data is read.
85 off_t region_aligned_begin = 0;
86 /// Left offset to align the region of the disk.
87 size_t region_left_padding = 0;
88 /// The size of the aligned region of the disk.
89 size_t region_aligned_size = 0;
90
91 /// The file descriptor for read.
92 int fd = -1;
93
94 /// The buffer to which the received data is written.
95 Position buffer_begin = nullptr;
96
97 /// The asynchronous read operation is not yet completed.
98 bool is_pending_read = false;
99 /// The end of the file is reached.
100 bool is_eof = false;
101 /// At least one read request was sent.
102 bool is_started = false;
103 /// Is the operation asynchronous?
104 bool is_aio = false;
105 /// Did the asynchronous operation fail?
106 bool aio_failed = false;
107
108 CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
109};
110
111}
112
113#endif
114