1 | #include <Storages/MergeTree/MergeTreeReaderStream.h> |
2 | #include <Poco/File.h> |
3 | |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | namespace ErrorCodes |
9 | { |
10 | extern const int LOGICAL_ERROR; |
11 | extern const int CORRUPTED_DATA; |
12 | extern const int CANNOT_READ_ALL_DATA; |
13 | extern const int ARGUMENT_OUT_OF_BOUND; |
14 | } |
15 | |
16 | |
17 | MergeTreeReaderStream::MergeTreeReaderStream( |
18 | const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, |
19 | const MarkRanges & all_mark_ranges, |
20 | MarkCache * mark_cache_, bool save_marks_in_cache_, |
21 | UncompressedCache * uncompressed_cache, |
22 | size_t file_size, size_t aio_threshold, size_t max_read_buffer_size, |
23 | const MergeTreeIndexGranularityInfo * index_granularity_info_, |
24 | const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) |
25 | : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) |
26 | , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_) |
27 | , index_granularity_info(index_granularity_info_) |
28 | { |
29 | /// Compute the size of the buffer. |
30 | size_t max_mark_range_bytes = 0; |
31 | size_t sum_mark_range_bytes = 0; |
32 | |
33 | /// Care should be taken to not load marks when the part is empty (marks_count == 0). |
34 | |
35 | for (const auto & mark_range : all_mark_ranges) |
36 | { |
37 | size_t left_mark = mark_range.begin; |
38 | size_t right_mark = mark_range.end; |
39 | |
40 | /// NOTE: if we are reading the whole file, then right_mark == marks_count |
41 | /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. |
42 | |
43 | /// If the end of range is inside the block, we will need to read it too. |
44 | if (right_mark < marks_count && getMark(right_mark).offset_in_decompressed_block > 0) |
45 | { |
46 | while (right_mark < marks_count |
47 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file) |
48 | { |
49 | ++right_mark; |
50 | } |
51 | } |
52 | |
53 | size_t mark_range_bytes; |
54 | |
55 | /// If there are no marks after the end of range, just use file size |
56 | if (right_mark >= marks_count |
57 | || (right_mark + 1 == marks_count |
58 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file)) |
59 | { |
60 | mark_range_bytes = file_size - (left_mark < marks_count ? getMark(left_mark).offset_in_compressed_file : 0); |
61 | } |
62 | else |
63 | { |
64 | mark_range_bytes = getMark(right_mark).offset_in_compressed_file - getMark(left_mark).offset_in_compressed_file; |
65 | } |
66 | |
67 | max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes); |
68 | sum_mark_range_bytes += mark_range_bytes; |
69 | } |
70 | |
71 | /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. |
72 | /// For example: part has single dictionary and all marks point to the same position. |
73 | if (max_mark_range_bytes == 0) |
74 | max_mark_range_bytes = max_read_buffer_size; |
75 | |
76 | size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes); |
77 | |
78 | /// Initialize the objects that shall be used to perform read operations. |
79 | if (uncompressed_cache) |
80 | { |
81 | auto buffer = std::make_unique<CachedCompressedReadBuffer>( |
82 | path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size); |
83 | |
84 | if (profile_callback) |
85 | buffer->setProfileCallback(profile_callback, clock_type); |
86 | |
87 | cached_buffer = std::move(buffer); |
88 | data_buffer = cached_buffer.get(); |
89 | } |
90 | else |
91 | { |
92 | auto buffer = std::make_unique<CompressedReadBufferFromFile>( |
93 | path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size); |
94 | |
95 | if (profile_callback) |
96 | buffer->setProfileCallback(profile_callback, clock_type); |
97 | |
98 | non_cached_buffer = std::move(buffer); |
99 | data_buffer = non_cached_buffer.get(); |
100 | } |
101 | } |
102 | |
103 | |
104 | const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index) |
105 | { |
106 | if (!marks) |
107 | loadMarks(); |
108 | return (*marks)[index]; |
109 | } |
110 | |
111 | |
112 | void MergeTreeReaderStream::loadMarks() |
113 | { |
114 | std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix); |
115 | |
116 | auto load = [&]() -> MarkCache::MappedPtr |
117 | { |
118 | /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. |
119 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); |
120 | |
121 | size_t file_size = Poco::File(mrk_path).getSize(); |
122 | size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; |
123 | if (expected_file_size != file_size) |
124 | throw Exception( |
125 | "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), |
126 | ErrorCodes::CORRUPTED_DATA); |
127 | |
128 | auto res = std::make_shared<MarksInCompressedFile>(marks_count); |
129 | |
130 | if (!index_granularity_info->is_adaptive) |
131 | { |
132 | /// Read directly to marks. |
133 | ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data())); |
134 | |
135 | if (buffer.eof() || buffer.buffer().size() != file_size) |
136 | throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); |
137 | } |
138 | else |
139 | { |
140 | ReadBufferFromFile buffer(mrk_path, file_size, -1); |
141 | size_t i = 0; |
142 | while (!buffer.eof()) |
143 | { |
144 | readIntBinary((*res)[i].offset_in_compressed_file, buffer); |
145 | readIntBinary((*res)[i].offset_in_decompressed_block, buffer); |
146 | buffer.seek(sizeof(size_t), SEEK_CUR); |
147 | ++i; |
148 | } |
149 | if (i * index_granularity_info->mark_size_in_bytes != file_size) |
150 | throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); |
151 | } |
152 | res->protect(); |
153 | return res; |
154 | }; |
155 | |
156 | if (mark_cache) |
157 | { |
158 | auto key = mark_cache->hash(mrk_path); |
159 | if (save_marks_in_cache) |
160 | { |
161 | marks = mark_cache->getOrSet(key, load); |
162 | } |
163 | else |
164 | { |
165 | marks = mark_cache->get(key); |
166 | if (!marks) |
167 | marks = load(); |
168 | } |
169 | } |
170 | else |
171 | marks = load(); |
172 | |
173 | if (!marks) |
174 | throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR); |
175 | } |
176 | |
177 | |
178 | void MergeTreeReaderStream::seekToMark(size_t index) |
179 | { |
180 | MarkInCompressedFile mark = getMark(index); |
181 | |
182 | try |
183 | { |
184 | if (cached_buffer) |
185 | cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); |
186 | if (non_cached_buffer) |
187 | non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); |
188 | } |
189 | catch (Exception & e) |
190 | { |
191 | /// Better diagnostics. |
192 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) |
193 | e.addMessage("(while seeking to mark " + toString(index) |
194 | + " of column " + path_prefix + "; offsets are: " |
195 | + toString(mark.offset_in_compressed_file) + " " |
196 | + toString(mark.offset_in_decompressed_block) + ")" ); |
197 | |
198 | throw; |
199 | } |
200 | } |
201 | |
202 | |
203 | void MergeTreeReaderStream::seekToStart() |
204 | { |
205 | try |
206 | { |
207 | if (cached_buffer) |
208 | cached_buffer->seek(0, 0); |
209 | if (non_cached_buffer) |
210 | non_cached_buffer->seek(0, 0); |
211 | } |
212 | catch (Exception & e) |
213 | { |
214 | /// Better diagnostics. |
215 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) |
216 | e.addMessage("(while seeking to start of column " + path_prefix + ")" ); |
217 | |
218 | throw; |
219 | } |
220 | } |
221 | |
222 | } |
223 | |