1#include <Storages/MergeTree/MergeTreeReaderStream.h>
2#include <Poco/File.h>
3
4
5namespace DB
6{
7
8namespace ErrorCodes
9{
10 extern const int LOGICAL_ERROR;
11 extern const int CORRUPTED_DATA;
12 extern const int CANNOT_READ_ALL_DATA;
13 extern const int ARGUMENT_OUT_OF_BOUND;
14}
15
16
17MergeTreeReaderStream::MergeTreeReaderStream(
18 const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
19 const MarkRanges & all_mark_ranges,
20 MarkCache * mark_cache_, bool save_marks_in_cache_,
21 UncompressedCache * uncompressed_cache,
22 size_t file_size, size_t aio_threshold, size_t max_read_buffer_size,
23 const MergeTreeIndexGranularityInfo * index_granularity_info_,
24 const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
25 : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
26 , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
27 , index_granularity_info(index_granularity_info_)
28{
29 /// Compute the size of the buffer.
30 size_t max_mark_range_bytes = 0;
31 size_t sum_mark_range_bytes = 0;
32
33 /// Care should be taken to not load marks when the part is empty (marks_count == 0).
34
35 for (const auto & mark_range : all_mark_ranges)
36 {
37 size_t left_mark = mark_range.begin;
38 size_t right_mark = mark_range.end;
39
40 /// NOTE: if we are reading the whole file, then right_mark == marks_count
41 /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
42
43 /// If the end of range is inside the block, we will need to read it too.
44 if (right_mark < marks_count && getMark(right_mark).offset_in_decompressed_block > 0)
45 {
46 while (right_mark < marks_count
47 && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file)
48 {
49 ++right_mark;
50 }
51 }
52
53 size_t mark_range_bytes;
54
55 /// If there are no marks after the end of range, just use file size
56 if (right_mark >= marks_count
57 || (right_mark + 1 == marks_count
58 && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file))
59 {
60 mark_range_bytes = file_size - (left_mark < marks_count ? getMark(left_mark).offset_in_compressed_file : 0);
61 }
62 else
63 {
64 mark_range_bytes = getMark(right_mark).offset_in_compressed_file - getMark(left_mark).offset_in_compressed_file;
65 }
66
67 max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes);
68 sum_mark_range_bytes += mark_range_bytes;
69 }
70
71 /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
72 /// For example: part has single dictionary and all marks point to the same position.
73 if (max_mark_range_bytes == 0)
74 max_mark_range_bytes = max_read_buffer_size;
75
76 size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes);
77
78 /// Initialize the objects that shall be used to perform read operations.
79 if (uncompressed_cache)
80 {
81 auto buffer = std::make_unique<CachedCompressedReadBuffer>(
82 path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size);
83
84 if (profile_callback)
85 buffer->setProfileCallback(profile_callback, clock_type);
86
87 cached_buffer = std::move(buffer);
88 data_buffer = cached_buffer.get();
89 }
90 else
91 {
92 auto buffer = std::make_unique<CompressedReadBufferFromFile>(
93 path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size);
94
95 if (profile_callback)
96 buffer->setProfileCallback(profile_callback, clock_type);
97
98 non_cached_buffer = std::move(buffer);
99 data_buffer = non_cached_buffer.get();
100 }
101}
102
103
104const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index)
105{
106 if (!marks)
107 loadMarks();
108 return (*marks)[index];
109}
110
111
112void MergeTreeReaderStream::loadMarks()
113{
114 std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix);
115
116 auto load = [&]() -> MarkCache::MappedPtr
117 {
118 /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
119 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
120
121 size_t file_size = Poco::File(mrk_path).getSize();
122 size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count;
123 if (expected_file_size != file_size)
124 throw Exception(
125 "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
126 ErrorCodes::CORRUPTED_DATA);
127
128 auto res = std::make_shared<MarksInCompressedFile>(marks_count);
129
130 if (!index_granularity_info->is_adaptive)
131 {
132 /// Read directly to marks.
133 ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
134
135 if (buffer.eof() || buffer.buffer().size() != file_size)
136 throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
137 }
138 else
139 {
140 ReadBufferFromFile buffer(mrk_path, file_size, -1);
141 size_t i = 0;
142 while (!buffer.eof())
143 {
144 readIntBinary((*res)[i].offset_in_compressed_file, buffer);
145 readIntBinary((*res)[i].offset_in_decompressed_block, buffer);
146 buffer.seek(sizeof(size_t), SEEK_CUR);
147 ++i;
148 }
149 if (i * index_granularity_info->mark_size_in_bytes != file_size)
150 throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
151 }
152 res->protect();
153 return res;
154 };
155
156 if (mark_cache)
157 {
158 auto key = mark_cache->hash(mrk_path);
159 if (save_marks_in_cache)
160 {
161 marks = mark_cache->getOrSet(key, load);
162 }
163 else
164 {
165 marks = mark_cache->get(key);
166 if (!marks)
167 marks = load();
168 }
169 }
170 else
171 marks = load();
172
173 if (!marks)
174 throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
175}
176
177
178void MergeTreeReaderStream::seekToMark(size_t index)
179{
180 MarkInCompressedFile mark = getMark(index);
181
182 try
183 {
184 if (cached_buffer)
185 cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
186 if (non_cached_buffer)
187 non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
188 }
189 catch (Exception & e)
190 {
191 /// Better diagnostics.
192 if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
193 e.addMessage("(while seeking to mark " + toString(index)
194 + " of column " + path_prefix + "; offsets are: "
195 + toString(mark.offset_in_compressed_file) + " "
196 + toString(mark.offset_in_decompressed_block) + ")");
197
198 throw;
199 }
200}
201
202
203void MergeTreeReaderStream::seekToStart()
204{
205 try
206 {
207 if (cached_buffer)
208 cached_buffer->seek(0, 0);
209 if (non_cached_buffer)
210 non_cached_buffer->seek(0, 0);
211 }
212 catch (Exception & e)
213 {
214 /// Better diagnostics.
215 if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
216 e.addMessage("(while seeking to start of column " + path_prefix + ")");
217
218 throw;
219 }
220}
221
222}
223