1#include <errno.h>
2#include <time.h>
3#include <optional>
4#include <Common/ProfileEvents.h>
5#include <Common/Stopwatch.h>
6#include <Common/Exception.h>
7#include <Common/CurrentMetrics.h>
8#include <IO/ReadBufferFromFileDescriptor.h>
9#include <IO/WriteHelpers.h>
10
11
12namespace ProfileEvents
13{
14 extern const Event ReadBufferFromFileDescriptorRead;
15 extern const Event ReadBufferFromFileDescriptorReadFailed;
16 extern const Event ReadBufferFromFileDescriptorReadBytes;
17 extern const Event DiskReadElapsedMicroseconds;
18 extern const Event Seek;
19}
20
21namespace CurrentMetrics
22{
23 extern const Metric Read;
24}
25
26namespace DB
27{
28
29namespace ErrorCodes
30{
31 extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
32 extern const int ARGUMENT_OUT_OF_BOUND;
33 extern const int CANNOT_SEEK_THROUGH_FILE;
34 extern const int CANNOT_SELECT;
35}
36
37
38std::string ReadBufferFromFileDescriptor::getFileName() const
39{
40 return "(fd = " + toString(fd) + ")";
41}
42
43
44bool ReadBufferFromFileDescriptor::nextImpl()
45{
46 size_t bytes_read = 0;
47 while (!bytes_read)
48 {
49 ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
50
51 Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
52
53 ssize_t res = 0;
54 {
55 CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
56 res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
57 }
58 if (!res)
59 break;
60
61 if (-1 == res && errno != EINTR)
62 {
63 ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
64 throwFromErrnoWithPath("Cannot read from file " + getFileName(), getFileName(),
65 ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
66 }
67
68 if (res > 0)
69 bytes_read += res;
70
71 /// It reports real time spent including the time spent while thread was preempted doing nothing.
72 /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
73 /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it (TaskStatsInfoGetter has about 500K RPS).
74 watch.stop();
75 ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
76
77 if (profile_callback)
78 {
79 ProfileInfo info;
80 info.bytes_requested = internal_buffer.size();
81 info.bytes_read = res;
82 info.nanoseconds = watch.elapsed();
83 profile_callback(info);
84 }
85 }
86
87 pos_in_file += bytes_read;
88
89 if (bytes_read)
90 {
91 ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
92 working_buffer.resize(bytes_read);
93 }
94 else
95 return false;
96
97 return true;
98}
99
100
101/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
102off_t ReadBufferFromFileDescriptor::doSeek(off_t offset, int whence)
103{
104 off_t new_pos = offset;
105 if (whence == SEEK_CUR)
106 new_pos = pos_in_file - (working_buffer.end() - pos) + offset;
107 else if (whence != SEEK_SET)
108 throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
109
110 /// Position is unchanged.
111 if (new_pos + (working_buffer.end() - pos) == pos_in_file)
112 return new_pos;
113
114 if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
115 {
116 /// Position is still inside buffer.
117 pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
118 return new_pos;
119 }
120 else
121 {
122 ProfileEvents::increment(ProfileEvents::Seek);
123 Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
124
125 pos = working_buffer.end();
126 off_t res = ::lseek(fd, new_pos, SEEK_SET);
127 if (-1 == res)
128 throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
129 ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
130 pos_in_file = new_pos;
131
132 watch.stop();
133 ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
134
135 return res;
136 }
137}
138
139
140/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
141bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds)
142{
143 fd_set fds;
144 FD_ZERO(&fds);
145 FD_SET(fd, &fds);
146 timeval timeout = { time_t(timeout_microseconds / 1000000), suseconds_t(timeout_microseconds % 1000000) };
147
148 int res = select(1, &fds, nullptr, nullptr, &timeout);
149
150 if (-1 == res)
151 throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
152
153 return res > 0;
154}
155
156}
157