1#include <DataTypes/NestedUtils.h>
2#include <DataTypes/DataTypeArray.h>
3#include <Common/escapeForFileName.h>
4#include <Compression/CachedCompressedReadBuffer.h>
5#include <Columns/ColumnArray.h>
6#include <Interpreters/evaluateMissingDefaults.h>
7#include <Storages/MergeTree/MergeTreeReader.h>
8#include <Common/typeid_cast.h>
9
10
11namespace DB
12{
13
14namespace
15{
16 using OffsetColumns = std::map<std::string, ColumnPtr>;
17
18 constexpr auto DATA_FILE_EXTENSION = ".bin";
19}
20
21namespace ErrorCodes
22{
23 extern const int LOGICAL_ERROR;
24 extern const int NOT_FOUND_EXPECTED_DATA_PART;
25 extern const int MEMORY_LIMIT_EXCEEDED;
26 extern const int ARGUMENT_OUT_OF_BOUND;
27}
28
29
30MergeTreeReader::~MergeTreeReader() = default;
31
32
33MergeTreeReader::MergeTreeReader(
34 String path_,
35 MergeTreeData::DataPartPtr data_part_,
36 NamesAndTypesList columns_,
37 UncompressedCache * uncompressed_cache_,
38 MarkCache * mark_cache_,
39 bool save_marks_in_cache_,
40 const MergeTreeData & storage_,
41 MarkRanges all_mark_ranges_,
42 size_t aio_threshold_,
43 size_t max_read_buffer_size_,
44 ValueSizeMap avg_value_size_hints_,
45 const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
46 clockid_t clock_type_)
47 : data_part(std::move(data_part_))
48 , avg_value_size_hints(std::move(avg_value_size_hints_))
49 , path(std::move(path_)), columns(std::move(columns_))
50 , uncompressed_cache(uncompressed_cache_)
51 , mark_cache(mark_cache_)
52 , save_marks_in_cache(save_marks_in_cache_)
53 , storage(storage_)
54 , all_mark_ranges(std::move(all_mark_ranges_))
55 , aio_threshold(aio_threshold_)
56 , max_read_buffer_size(max_read_buffer_size_)
57{
58 try
59 {
60 for (const NameAndTypePair & column : columns)
61 addStreams(column.name, *column.type, profile_callback_, clock_type_);
62 }
63 catch (...)
64 {
65 storage.reportBrokenPart(data_part->name);
66 throw;
67 }
68}
69
70
71const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() const
72{
73 return avg_value_size_hints;
74}
75
76
77size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
78{
79 size_t read_rows = 0;
80 try
81 {
82 size_t num_columns = columns.size();
83
84 if (res_columns.size() != num_columns)
85 throw Exception("invalid number of columns passed to MergeTreeReader::readRows. "
86 "Expected " + toString(num_columns) + ", "
87 "got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
88
89 /// Pointers to offset columns that are common to the nested data structure columns.
90 /// If append is true, then the value will be equal to nullptr and will be used only to
91 /// check that the offsets column has been already read.
92 OffsetColumns offset_columns;
93
94 auto name_and_type = columns.begin();
95 for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
96 {
97 auto & [name, type] = *name_and_type;
98
99 /// The column is already present in the block so we will append the values to the end.
100 bool append = res_columns[pos] != nullptr;
101 if (!append)
102 res_columns[pos] = name_and_type->type->createColumn();
103
104 /// To keep offsets shared. TODO Very dangerous. Get rid of this.
105 MutableColumnPtr column = res_columns[pos]->assumeMutable();
106
107 bool read_offsets = true;
108
109 /// For nested data structures collect pointers to offset columns.
110 if (const auto * type_arr = typeid_cast<const DataTypeArray *>(type.get()))
111 {
112 String table_name = Nested::extractTableName(name);
113
114 auto it_inserted = offset_columns.emplace(table_name, nullptr);
115
116 /// offsets have already been read on the previous iteration and we don't need to read it again
117 if (!it_inserted.second)
118 read_offsets = false;
119
120 /// need to create new offsets
121 if (it_inserted.second && !append)
122 it_inserted.first->second = ColumnArray::ColumnOffsets::create();
123
124 /// share offsets in all elements of nested structure
125 if (!append)
126 column = ColumnArray::create(type_arr->getNestedType()->createColumn(),
127 it_inserted.first->second)->assumeMutable();
128 }
129
130 try
131 {
132 size_t column_size_before_reading = column->size();
133
134 readData(name, *type, *column, from_mark, continue_reading, max_rows_to_read, read_offsets);
135
136 /// For elements of Nested, column_size_before_reading may be greater than column size
137 /// if offsets are not empty and were already read, but elements are empty.
138 if (!column->empty())
139 read_rows = std::max(read_rows, column->size() - column_size_before_reading);
140 }
141 catch (Exception & e)
142 {
143 /// Better diagnostics.
144 e.addMessage("(while reading column " + name + ")");
145 throw;
146 }
147
148 if (column->empty())
149 res_columns[pos] = nullptr;
150 else
151 res_columns[pos] = std::move(column);
152 }
153
154 /// NOTE: positions for all streams must be kept in sync.
155 /// In particular, even if for some streams there are no rows to be read,
156 /// you must ensure that no seeks are skipped and at this point they all point to to_mark.
157 }
158 catch (Exception & e)
159 {
160 if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
161 storage.reportBrokenPart(data_part->name);
162
163 /// Better diagnostics.
164 e.addMessage("(while reading from part " + path + " "
165 "from mark " + toString(from_mark) + " "
166 "with max_rows_to_read = " + toString(max_rows_to_read) + ")");
167 throw;
168 }
169 catch (...)
170 {
171 storage.reportBrokenPart(data_part->name);
172
173 throw;
174 }
175
176 return read_rows;
177}
178
179void MergeTreeReader::addStreams(const String & name, const IDataType & type,
180 const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
181{
182 IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
183 {
184 String stream_name = IDataType::getFileNameForStream(name, substream_path);
185
186 if (streams.count(stream_name))
187 return;
188
189 bool data_file_exists = data_part->checksums.files.count(stream_name + DATA_FILE_EXTENSION);
190
191 /** If data file is missing then we will not try to open it.
192 * It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
193 */
194 if (!data_file_exists)
195 return;
196
197 streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
198 path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
199 all_mark_ranges, mark_cache, save_marks_in_cache,
200 uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
201 aio_threshold, max_read_buffer_size,
202 &data_part->index_granularity_info,
203 profile_callback, clock_type));
204 };
205
206 IDataType::SubstreamPath substream_path;
207 type.enumerateStreams(callback, substream_path);
208}
209
210
211void MergeTreeReader::readData(
212 const String & name, const IDataType & type, IColumn & column,
213 size_t from_mark, bool continue_reading, size_t max_rows_to_read,
214 bool with_offsets)
215{
216 auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter
217 {
218 return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
219 {
220 /// If offsets for arrays have already been read.
221 if (!with_offsets && substream_path.size() == 1 && substream_path[0].type == IDataType::Substream::ArraySizes)
222 return nullptr;
223
224 String stream_name = IDataType::getFileNameForStream(name, substream_path);
225
226 auto it = streams.find(stream_name);
227 if (it == streams.end())
228 return nullptr;
229
230 MergeTreeReaderStream & stream = *it->second;
231
232 if (stream_for_prefix)
233 {
234 stream.seekToStart();
235 continue_reading = false;
236 }
237 else if (!continue_reading)
238 stream.seekToMark(from_mark);
239
240 return stream.data_buffer;
241 };
242 };
243
244 double & avg_value_size_hint = avg_value_size_hints[name];
245 IDataType::DeserializeBinaryBulkSettings settings;
246 settings.avg_value_size_hint = avg_value_size_hint;
247
248 if (deserialize_binary_bulk_state_map.count(name) == 0)
249 {
250 settings.getter = get_stream_getter(true);
251 type.deserializeBinaryBulkStatePrefix(settings, deserialize_binary_bulk_state_map[name]);
252 }
253
254 settings.getter = get_stream_getter(false);
255 settings.continuous_reading = continue_reading;
256 auto & deserialize_state = deserialize_binary_bulk_state_map[name];
257 type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_state);
258 IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
259}
260
261
262static bool arrayHasNoElementsRead(const IColumn & column)
263{
264 const auto * column_array = typeid_cast<const ColumnArray *>(&column);
265
266 if (!column_array)
267 return false;
268
269 size_t size = column_array->size();
270 if (!size)
271 return false;
272
273 size_t data_size = column_array->getData().size();
274 if (data_size)
275 return false;
276
277 size_t last_offset = column_array->getOffsets()[size - 1];
278 return last_offset != 0;
279}
280
281
282void MergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows)
283{
284 try
285 {
286 size_t num_columns = columns.size();
287
288 if (res_columns.size() != num_columns)
289 throw Exception("invalid number of columns passed to MergeTreeReader::fillMissingColumns. "
290 "Expected " + toString(num_columns) + ", "
291 "got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
292
293 /// For a missing column of a nested data structure we must create not a column of empty
294 /// arrays, but a column of arrays of correct length.
295
296 /// First, collect offset columns for all arrays in the block.
297 OffsetColumns offset_columns;
298 auto requested_column = columns.begin();
299 for (size_t i = 0; i < num_columns; ++i, ++requested_column)
300 {
301 if (res_columns[i] == nullptr)
302 continue;
303
304 if (const auto * array = typeid_cast<const ColumnArray *>(res_columns[i].get()))
305 {
306 String offsets_name = Nested::extractTableName(requested_column->name);
307 auto & offsets_column = offset_columns[offsets_name];
308
309 /// If for some reason multiple offsets columns are present for the same nested data structure,
310 /// choose the one that is not empty.
311 if (!offsets_column || offsets_column->empty())
312 offsets_column = array->getOffsetsPtr();
313 }
314 }
315
316 should_evaluate_missing_defaults = false;
317
318 /// insert default values only for columns without default expressions
319 requested_column = columns.begin();
320 for (size_t i = 0; i < num_columns; ++i, ++requested_column)
321 {
322 auto & [name, type] = *requested_column;
323
324 if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i]))
325 res_columns[i] = nullptr;
326
327 if (res_columns[i] == nullptr)
328 {
329 if (storage.getColumns().hasDefault(name))
330 {
331 should_evaluate_missing_defaults = true;
332 continue;
333 }
334
335 String offsets_name = Nested::extractTableName(name);
336 auto offset_it = offset_columns.find(offsets_name);
337 if (offset_it != offset_columns.end())
338 {
339 ColumnPtr offsets_column = offset_it->second;
340 DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*type).getNestedType();
341 size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
342
343 ColumnPtr nested_column =
344 nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
345
346 res_columns[i] = ColumnArray::create(nested_column, offsets_column);
347 }
348 else
349 {
350 /// We must turn a constant column into a full column because the interpreter could infer
351 /// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
352 res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
353 }
354 }
355 }
356 }
357 catch (Exception & e)
358 {
359 /// Better diagnostics.
360 e.addMessage("(while reading from part " + path + ")");
361 throw;
362 }
363}
364
365void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns & res_columns)
366{
367 try
368 {
369 size_t num_columns = columns.size();
370
371 if (res_columns.size() != num_columns)
372 throw Exception("invalid number of columns passed to MergeTreeReader::fillMissingColumns. "
373 "Expected " + toString(num_columns) + ", "
374 "got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
375
376 /// Convert columns list to block.
377 /// TODO: rewrite with columns interface. It wll be possible after changes in ExpressionActions.
378 auto name_and_type = columns.begin();
379 for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
380 {
381 if (res_columns[pos] == nullptr)
382 continue;
383
384 additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
385 }
386
387 DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context);
388
389 /// Move columns from block.
390 name_and_type = columns.begin();
391 for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
392 res_columns[pos] = std::move(additional_columns.getByName(name_and_type->name).column);
393 }
394 catch (Exception & e)
395 {
396 /// Better diagnostics.
397 e.addMessage("(while reading from part " + path + ")");
398 throw;
399 }
400}
401
402}
403