1#include <Storages/MergeTree/MergedBlockOutputStream.h>
2#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
3#include <IO/createWriteBufferFromFileBase.h>
4#include <Common/escapeForFileName.h>
5#include <DataTypes/NestedUtils.h>
6#include <DataStreams/MarkInCompressedFile.h>
7#include <Common/StringUtils/StringUtils.h>
8#include <Common/typeid_cast.h>
9#include <Common/MemoryTracker.h>
10#include <Poco/File.h>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int BAD_ARGUMENTS;
19}
20
21
22MergedBlockOutputStream::MergedBlockOutputStream(
23 MergeTreeData & storage_,
24 const String & part_path_,
25 const NamesAndTypesList & columns_list_,
26 CompressionCodecPtr default_codec_,
27 bool blocks_are_granules_size_)
28 : IMergedBlockOutputStream(
29 storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size,
30 storage_.global_context.getSettings().max_compress_block_size, default_codec_,
31 storage_.global_context.getSettings().min_bytes_to_use_direct_io,
32 blocks_are_granules_size_,
33 std::vector<MergeTreeIndexPtr>(std::begin(storage_.skip_indices), std::end(storage_.skip_indices)),
34 {})
35 , columns_list(columns_list_)
36{
37 init();
38 for (const auto & it : columns_list)
39 {
40 const auto columns = storage.getColumns();
41 addStreams(part_path, it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec_), 0, false);
42 }
43}
44
45MergedBlockOutputStream::MergedBlockOutputStream(
46 MergeTreeData & storage_,
47 const String & part_path_,
48 const NamesAndTypesList & columns_list_,
49 CompressionCodecPtr default_codec_,
50 const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
51 size_t aio_threshold_,
52 bool blocks_are_granules_size_)
53 : IMergedBlockOutputStream(
54 storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size,
55 storage_.global_context.getSettings().max_compress_block_size, default_codec_,
56 aio_threshold_, blocks_are_granules_size_,
57 std::vector<MergeTreeIndexPtr>(std::begin(storage_.skip_indices), std::end(storage_.skip_indices)), {})
58 , columns_list(columns_list_)
59{
60 init();
61
62 /// If summary size is more than threshold than we will use AIO
63 size_t total_size = 0;
64 if (aio_threshold > 0)
65 {
66 for (const auto & it : columns_list)
67 {
68 auto it2 = merged_column_to_size_.find(it.name);
69 if (it2 != merged_column_to_size_.end())
70 total_size += it2->second;
71 }
72 }
73
74 for (const auto & it : columns_list)
75 {
76 const auto columns = storage.getColumns();
77 addStreams(part_path, it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec_), total_size, false);
78 }
79}
80
81std::string MergedBlockOutputStream::getPartPath() const
82{
83 return part_path;
84}
85
86/// If data is pre-sorted.
87void MergedBlockOutputStream::write(const Block & block)
88{
89 writeImpl(block, nullptr);
90}
91
92/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
93 * This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
94 */
95void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
96{
97 writeImpl(block, permutation);
98}
99
100void MergedBlockOutputStream::writeSuffix()
101{
102 throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
103}
104
105void MergedBlockOutputStream::writeSuffixAndFinalizePart(
106 MergeTreeData::MutableDataPartPtr & new_part,
107 const NamesAndTypesList * total_column_list,
108 MergeTreeData::DataPart::Checksums * additional_column_checksums)
109{
110 /// Finish columns serialization.
111 {
112 auto & settings = storage.global_context.getSettingsRef();
113 IDataType::SerializeBinaryBulkSettings serialize_settings;
114 serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
115 serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
116 WrittenOffsetColumns offset_columns;
117 auto it = columns_list.begin();
118 for (size_t i = 0; i < columns_list.size(); ++i, ++it)
119 {
120 if (!serialization_states.empty())
121 {
122 serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
123 it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
124 }
125
126 if (with_final_mark && rows_count != 0)
127 writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
128 }
129 }
130
131 if (with_final_mark && rows_count != 0)
132 index_granularity.appendMark(0); /// last mark
133
134 if (!total_column_list)
135 total_column_list = &columns_list;
136
137 /// Finish write and get checksums.
138 MergeTreeData::DataPart::Checksums checksums;
139
140 if (additional_column_checksums)
141 checksums = std::move(*additional_column_checksums);
142
143 if (index_stream)
144 {
145 if (with_final_mark && rows_count != 0)
146 {
147 for (size_t j = 0; j < index_columns.size(); ++j)
148 {
149 auto & column = *last_index_row[j].column;
150 index_columns[j]->insertFrom(column, 0); /// it has only one element
151 last_index_row[j].type->serializeBinary(column, 0, *index_stream);
152 }
153 last_index_row.clear();
154 }
155
156 index_stream->next();
157 checksums.files["primary.idx"].file_size = index_stream->count();
158 checksums.files["primary.idx"].file_hash = index_stream->getHash();
159 index_stream = nullptr;
160 }
161
162 for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
163 {
164 it->second->finalize();
165 it->second->addToChecksums(checksums);
166 }
167
168 finishSkipIndicesSerialization(checksums);
169
170 column_streams.clear();
171
172 if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
173 {
174 new_part->partition.store(storage, part_path, checksums);
175 if (new_part->minmax_idx.initialized)
176 new_part->minmax_idx.store(storage, part_path, checksums);
177 else if (rows_count)
178 throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
179 + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
180
181 WriteBufferFromFile count_out(part_path + "count.txt", 4096);
182 HashingWriteBuffer count_out_hashing(count_out);
183 writeIntText(rows_count, count_out_hashing);
184 count_out_hashing.next();
185 checksums.files["count.txt"].file_size = count_out_hashing.count();
186 checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
187 }
188
189 if (!new_part->ttl_infos.empty())
190 {
191 /// Write a file with ttl infos in json format.
192 WriteBufferFromFile out(part_path + "ttl.txt", 4096);
193 HashingWriteBuffer out_hashing(out);
194 new_part->ttl_infos.write(out_hashing);
195 checksums.files["ttl.txt"].file_size = out_hashing.count();
196 checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
197 }
198
199 {
200 /// Write a file with a description of columns.
201 WriteBufferFromFile out(part_path + "columns.txt", 4096);
202 total_column_list->writeText(out);
203 }
204
205 {
206 /// Write file with checksums.
207 WriteBufferFromFile out(part_path + "checksums.txt", 4096);
208 checksums.write(out);
209 }
210
211 new_part->rows_count = rows_count;
212 new_part->modification_time = time(nullptr);
213 new_part->columns = *total_column_list;
214 new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
215 new_part->checksums = checksums;
216 new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
217 new_part->index_granularity = index_granularity;
218}
219
220void MergedBlockOutputStream::init()
221{
222 Poco::File(part_path).createDirectories();
223
224 if (storage.hasPrimaryKey())
225 {
226 index_file_stream = std::make_unique<WriteBufferFromFile>(
227 part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
228 index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
229 }
230
231 initSkipIndices();
232}
233
234
235void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
236{
237 block.checkNumberOfRows();
238 size_t rows = block.rows();
239 if (!rows)
240 return;
241
242 /// Fill index granularity for this block
243 /// if it's unknown (in case of insert data or horizontal merge,
244 /// but not in case of vertical merge)
245 if (compute_granularity)
246 fillIndexGranularity(block);
247
248 /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
249 WrittenOffsetColumns offset_columns;
250
251 auto primary_key_column_names = storage.primary_key_columns;
252 std::set<String> skip_indexes_column_names_set;
253 for (const auto & index : storage.skip_indices)
254 std::copy(index->columns.cbegin(), index->columns.cend(),
255 std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
256 Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
257
258 /// Here we will add the columns related to the Primary Key, then write the index.
259 std::vector<ColumnWithTypeAndName> primary_key_columns(primary_key_column_names.size());
260 std::map<String, size_t> primary_key_column_name_to_position;
261
262 for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
263 {
264 const auto & name = primary_key_column_names[i];
265
266 if (!primary_key_column_name_to_position.emplace(name, i).second)
267 throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
268
269 primary_key_columns[i] = block.getByName(name);
270
271 /// Reorder primary key columns in advance and add them to `primary_key_columns`.
272 if (permutation)
273 primary_key_columns[i].column = primary_key_columns[i].column->permute(*permutation, 0);
274 }
275
276 /// The same for skip indexes columns
277 std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size());
278 std::map<String, size_t> skip_indexes_column_name_to_position;
279
280 for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i)
281 {
282 const auto & name = skip_indexes_column_names[i];
283 skip_indexes_column_name_to_position.emplace(name, i);
284 skip_indexes_columns[i] = block.getByName(name);
285
286 /// Reorder index columns in advance.
287 if (permutation)
288 skip_indexes_columns[i].column = skip_indexes_columns[i].column->permute(*permutation, 0);
289 }
290
291 if (index_columns.empty())
292 {
293 index_columns.resize(primary_key_column_names.size());
294 last_index_row.resize(primary_key_column_names.size());
295 for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
296 {
297 index_columns[i] = primary_key_columns[i].column->cloneEmpty();
298 last_index_row[i] = primary_key_columns[i].cloneEmpty();
299 }
300 }
301
302 if (serialization_states.empty())
303 {
304 serialization_states.reserve(columns_list.size());
305 WrittenOffsetColumns tmp_offset_columns;
306 IDataType::SerializeBinaryBulkSettings settings;
307
308 for (const auto & col : columns_list)
309 {
310 settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
311 serialization_states.emplace_back(nullptr);
312 col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
313 }
314 }
315
316 size_t new_index_offset = 0;
317 /// Now write the data.
318 auto it = columns_list.begin();
319 for (size_t i = 0; i < columns_list.size(); ++i, ++it)
320 {
321 const ColumnWithTypeAndName & column = block.getByName(it->name);
322
323 if (permutation)
324 {
325 auto primary_column_it = primary_key_column_name_to_position.find(it->name);
326 auto skip_index_column_it = skip_indexes_column_name_to_position.find(it->name);
327 if (primary_key_column_name_to_position.end() != primary_column_it)
328 {
329 const auto & primary_column = *primary_key_columns[primary_column_it->second].column;
330 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_mark);
331 }
332 else if (skip_indexes_column_name_to_position.end() != skip_index_column_it)
333 {
334 const auto & index_column = *skip_indexes_columns[skip_index_column_it->second].column;
335 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark);
336 }
337 else
338 {
339 /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
340 ColumnPtr permuted_column = column.column->permute(*permutation, 0);
341 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_mark);
342 }
343 }
344 else
345 {
346 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_mark);
347 }
348 }
349
350 rows_count += rows;
351
352 /// Should be written before index offset update, because we calculate,
353 /// indices of currently written granules
354 calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
355
356 {
357 /** While filling index (index_columns), disable memory tracker.
358 * Because memory is allocated here (maybe in context of INSERT query),
359 * but then freed in completely different place (while merging parts), where query memory_tracker is not available.
360 * And otherwise it will look like excessively growing memory consumption in context of query.
361 * (observed in long INSERT SELECTs)
362 */
363 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
364
365 /// Write index. The index contains Primary Key value for each `index_granularity` row.
366 for (size_t i = index_offset; i < rows;)
367 {
368 if (storage.hasPrimaryKey())
369 {
370 for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j)
371 {
372 const IColumn & primary_column = *primary_key_columns[j].column.get();
373 index_columns[j]->insertFrom(primary_column, i);
374 primary_key_columns[j].type->serializeBinary(primary_column, i, *index_stream);
375 }
376 }
377
378 ++current_mark;
379 if (current_mark < index_granularity.getMarksCount())
380 i += index_granularity.getMarkRows(current_mark);
381 else
382 break;
383 }
384 }
385
386 /// store last index row to write final mark at the end of column
387 for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j)
388 {
389 const IColumn & primary_column = *primary_key_columns[j].column.get();
390 auto mutable_column = std::move(*last_index_row[j].column).mutate();
391 if (!mutable_column->empty())
392 mutable_column->popBack(1);
393 mutable_column->insertFrom(primary_column, rows - 1);
394 last_index_row[j].column = std::move(mutable_column);
395 }
396
397 index_offset = new_index_offset;
398}
399
400}
401