1#include <Storages/MergeTree/IMergedBlockOutputStream.h>
2#include <IO/createWriteBufferFromFileBase.h>
3
4namespace DB
5{
6
7namespace ErrorCodes
8{
9 extern const int LOGICAL_ERROR;
10}
11
12namespace
13{
14 constexpr auto DATA_FILE_EXTENSION = ".bin";
15 constexpr auto INDEX_FILE_EXTENSION = ".idx";
16}
17
18
19IMergedBlockOutputStream::IMergedBlockOutputStream(
20 MergeTreeData & storage_,
21 const String & part_path_,
22 size_t min_compress_block_size_,
23 size_t max_compress_block_size_,
24 CompressionCodecPtr codec_,
25 size_t aio_threshold_,
26 bool blocks_are_granules_size_,
27 const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
28 const MergeTreeIndexGranularity & index_granularity_,
29 const MergeTreeIndexGranularityInfo * index_granularity_info_)
30 : storage(storage_)
31 , part_path(part_path_)
32 , min_compress_block_size(min_compress_block_size_)
33 , max_compress_block_size(max_compress_block_size_)
34 , aio_threshold(aio_threshold_)
35 , can_use_adaptive_granularity(index_granularity_info_ ? index_granularity_info_->is_adaptive : storage.canUseAdaptiveGranularity())
36 , marks_file_extension(can_use_adaptive_granularity ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension())
37 , blocks_are_granules_size(blocks_are_granules_size_)
38 , index_granularity(index_granularity_)
39 , compute_granularity(index_granularity.empty())
40 , codec(std::move(codec_))
41 , skip_indices(indices_to_recalc)
42 , with_final_mark(storage.getSettings()->write_final_mark && can_use_adaptive_granularity)
43{
44 if (blocks_are_granules_size && !index_granularity.empty())
45 throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
46}
47
48void IMergedBlockOutputStream::addStreams(
49 const String & path,
50 const String & name,
51 const IDataType & type,
52 const CompressionCodecPtr & effective_codec,
53 size_t estimated_size,
54 bool skip_offsets)
55{
56 IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
57 {
58 if (skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
59 return;
60
61 String stream_name = IDataType::getFileNameForStream(name, substream_path);
62
63 /// Shared offsets for Nested type.
64 if (column_streams.count(stream_name))
65 return;
66
67 column_streams[stream_name] = std::make_unique<ColumnStream>(
68 stream_name,
69 path + stream_name, DATA_FILE_EXTENSION,
70 path + stream_name, marks_file_extension,
71 effective_codec,
72 max_compress_block_size,
73 estimated_size,
74 aio_threshold);
75 };
76
77 IDataType::SubstreamPath stream_path;
78 type.enumerateStreams(callback, stream_path);
79}
80
81
82IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
83 const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets)
84{
85 return [&, skip_offsets] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
86 {
87 bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
88 if (is_offsets && skip_offsets)
89 return nullptr;
90
91 String stream_name = IDataType::getFileNameForStream(name, substream_path);
92
93 /// Don't write offsets more than one time for Nested type.
94 if (is_offsets && offset_columns.count(stream_name))
95 return nullptr;
96
97 return &column_streams[stream_name]->compressed;
98 };
99}
100
101static void fillIndexGranularityImpl(
102 const Block & block,
103 size_t index_granularity_bytes,
104 size_t fixed_index_granularity_rows,
105 bool blocks_are_granules,
106 size_t index_offset,
107 MergeTreeIndexGranularity & index_granularity,
108 bool can_use_adaptive_index_granularity)
109{
110 size_t rows_in_block = block.rows();
111 size_t index_granularity_for_block;
112 if (!can_use_adaptive_index_granularity)
113 index_granularity_for_block = fixed_index_granularity_rows;
114 else
115 {
116 size_t block_size_in_memory = block.bytes();
117 if (blocks_are_granules)
118 index_granularity_for_block = rows_in_block;
119 else if (block_size_in_memory >= index_granularity_bytes)
120 {
121 size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
122 index_granularity_for_block = rows_in_block / granules_in_block;
123 }
124 else
125 {
126 size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
127 index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
128 }
129 }
130 if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
131 index_granularity_for_block = 1;
132
133 /// We should be less or equal than fixed index granularity
134 index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
135
136 for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
137 index_granularity.appendMark(index_granularity_for_block);
138}
139
140void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
141{
142 const auto storage_settings = storage.getSettings();
143 fillIndexGranularityImpl(
144 block,
145 storage_settings->index_granularity_bytes,
146 storage_settings->index_granularity,
147 blocks_are_granules_size,
148 index_offset,
149 index_granularity,
150 can_use_adaptive_granularity);
151}
152
153void IMergedBlockOutputStream::writeSingleMark(
154 const String & name,
155 const IDataType & type,
156 WrittenOffsetColumns & offset_columns,
157 bool skip_offsets,
158 size_t number_of_rows,
159 DB::IDataType::SubstreamPath & path)
160{
161 type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
162 {
163 bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
164 if (is_offsets && skip_offsets)
165 return;
166
167 String stream_name = IDataType::getFileNameForStream(name, substream_path);
168
169 /// Don't write offsets more than one time for Nested type.
170 if (is_offsets && offset_columns.count(stream_name))
171 return;
172
173 ColumnStream & stream = *column_streams[stream_name];
174
175 /// There could already be enough data to compress into the new block.
176 if (stream.compressed.offset() >= min_compress_block_size)
177 stream.compressed.next();
178
179 writeIntBinary(stream.plain_hashing.count(), stream.marks);
180 writeIntBinary(stream.compressed.offset(), stream.marks);
181 if (can_use_adaptive_granularity)
182 writeIntBinary(number_of_rows, stream.marks);
183 }, path);
184}
185
186size_t IMergedBlockOutputStream::writeSingleGranule(
187 const String & name,
188 const IDataType & type,
189 const IColumn & column,
190 WrittenOffsetColumns & offset_columns,
191 bool skip_offsets,
192 IDataType::SerializeBinaryBulkStatePtr & serialization_state,
193 IDataType::SerializeBinaryBulkSettings & serialize_settings,
194 size_t from_row,
195 size_t number_of_rows,
196 bool write_marks)
197{
198 if (write_marks)
199 writeSingleMark(name, type, offset_columns, skip_offsets, number_of_rows, serialize_settings.path);
200
201 type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
202
203 /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
204 type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
205 {
206 bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
207 if (is_offsets && skip_offsets)
208 return;
209
210 String stream_name = IDataType::getFileNameForStream(name, substream_path);
211
212 /// Don't write offsets more than one time for Nested type.
213 if (is_offsets && offset_columns.count(stream_name))
214 return;
215
216 column_streams[stream_name]->compressed.nextIfAtEnd();
217 }, serialize_settings.path);
218
219 return from_row + number_of_rows;
220}
221
222/// column must not be empty. (column.size() !== 0)
223
224std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
225 const String & name,
226 const IDataType & type,
227 const IColumn & column,
228 WrittenOffsetColumns & offset_columns,
229 bool skip_offsets,
230 IDataType::SerializeBinaryBulkStatePtr & serialization_state,
231 size_t from_mark)
232{
233 auto & settings = storage.global_context.getSettingsRef();
234 IDataType::SerializeBinaryBulkSettings serialize_settings;
235 serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
236 serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
237 serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
238
239 size_t total_rows = column.size();
240 size_t current_row = 0;
241 size_t current_column_mark = from_mark;
242 while (current_row < total_rows)
243 {
244 size_t rows_to_write;
245 bool write_marks = true;
246
247 /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
248 if (current_row == 0 && index_offset != 0)
249 {
250 write_marks = false;
251 rows_to_write = index_offset;
252 }
253 else
254 {
255 if (index_granularity.getMarksCount() <= current_column_mark)
256 throw Exception(
257 "Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.getMarksCount()),
258 ErrorCodes::LOGICAL_ERROR);
259
260 rows_to_write = index_granularity.getMarkRows(current_column_mark);
261 }
262
263 current_row = writeSingleGranule(
264 name,
265 type,
266 column,
267 offset_columns,
268 skip_offsets,
269 serialization_state,
270 serialize_settings,
271 current_row,
272 rows_to_write,
273 write_marks
274 );
275
276 if (write_marks)
277 current_column_mark++;
278 }
279
280 /// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
281 type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
282 {
283 bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
284 if (is_offsets)
285 {
286 String stream_name = IDataType::getFileNameForStream(name, substream_path);
287 offset_columns.insert(stream_name);
288 }
289 }, serialize_settings.path);
290
291 return std::make_pair(current_column_mark, current_row - total_rows);
292}
293
294void IMergedBlockOutputStream::writeFinalMark(
295 const std::string & column_name,
296 const DataTypePtr column_type,
297 WrittenOffsetColumns & offset_columns,
298 bool skip_offsets,
299 DB::IDataType::SubstreamPath & path)
300{
301 writeSingleMark(column_name, *column_type, offset_columns, skip_offsets, 0, path);
302 /// Memoize information about offsets
303 column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
304 {
305 bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
306 if (is_offsets)
307 {
308 String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
309 offset_columns.insert(stream_name);
310 }
311 }, path);
312}
313
314void IMergedBlockOutputStream::initSkipIndices()
315{
316 for (const auto & index : skip_indices)
317 {
318 String stream_name = index->getFileName();
319 skip_indices_streams.emplace_back(
320 std::make_unique<ColumnStream>(
321 stream_name,
322 part_path + stream_name, INDEX_FILE_EXTENSION,
323 part_path + stream_name, marks_file_extension,
324 codec, max_compress_block_size,
325 0, aio_threshold));
326 skip_indices_aggregators.push_back(index->createIndexAggregator());
327 skip_index_filling.push_back(0);
328 }
329}
330
331void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
332 const ColumnsWithTypeAndName & skip_indexes_columns, size_t rows)
333{
334 /// Creating block for update
335 Block indices_update_block(skip_indexes_columns);
336 size_t skip_index_current_data_mark = 0;
337
338 /// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
339 for (size_t i = 0; i < skip_indices.size(); ++i)
340 {
341 const auto index = skip_indices[i];
342 auto & stream = *skip_indices_streams[i];
343 size_t prev_pos = 0;
344 skip_index_current_data_mark = skip_index_data_mark;
345 while (prev_pos < rows)
346 {
347 UInt64 limit = 0;
348 if (prev_pos == 0 && index_offset != 0)
349 {
350 limit = index_offset;
351 }
352 else
353 {
354 limit = index_granularity.getMarkRows(skip_index_current_data_mark);
355 if (skip_indices_aggregators[i]->empty())
356 {
357 skip_indices_aggregators[i] = index->createIndexAggregator();
358 skip_index_filling[i] = 0;
359
360 if (stream.compressed.offset() >= min_compress_block_size)
361 stream.compressed.next();
362
363 writeIntBinary(stream.plain_hashing.count(), stream.marks);
364 writeIntBinary(stream.compressed.offset(), stream.marks);
365 /// Actually this numbers is redundant, but we have to store them
366 /// to be compatible with normal .mrk2 file format
367 if (can_use_adaptive_granularity)
368 writeIntBinary(1UL, stream.marks);
369 }
370 /// this mark is aggregated, go to the next one
371 skip_index_current_data_mark++;
372 }
373
374 size_t pos = prev_pos;
375 skip_indices_aggregators[i]->update(indices_update_block, &pos, limit);
376
377 if (pos == prev_pos + limit)
378 {
379 ++skip_index_filling[i];
380
381 /// write index if it is filled
382 if (skip_index_filling[i] == index->granularity)
383 {
384 skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
385 skip_index_filling[i] = 0;
386 }
387 }
388 prev_pos = pos;
389 }
390 }
391 skip_index_data_mark = skip_index_current_data_mark;
392}
393
394void IMergedBlockOutputStream::finishSkipIndicesSerialization(
395 MergeTreeData::DataPart::Checksums & checksums)
396{
397 for (size_t i = 0; i < skip_indices.size(); ++i)
398 {
399 auto & stream = *skip_indices_streams[i];
400 if (!skip_indices_aggregators[i]->empty())
401 skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
402 }
403
404 for (auto & stream : skip_indices_streams)
405 {
406 stream->finalize();
407 stream->addToChecksums(checksums);
408 }
409
410 skip_indices_streams.clear();
411 skip_indices_aggregators.clear();
412 skip_index_filling.clear();
413}
414
415/// Implementation of IMergedBlockOutputStream::ColumnStream.
416
417IMergedBlockOutputStream::ColumnStream::ColumnStream(
418 const String & escaped_column_name_,
419 const String & data_path_,
420 const std::string & data_file_extension_,
421 const std::string & marks_path_,
422 const std::string & marks_file_extension_,
423 const CompressionCodecPtr & compression_codec_,
424 size_t max_compress_block_size_,
425 size_t estimated_size_,
426 size_t aio_threshold_) :
427 escaped_column_name(escaped_column_name_),
428 data_file_extension{data_file_extension_},
429 marks_file_extension{marks_file_extension_},
430 plain_file(createWriteBufferFromFileBase(data_path_ + data_file_extension, estimated_size_, aio_threshold_, max_compress_block_size_)),
431 plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_codec_), compressed(compressed_buf),
432 marks_file(marks_path_ + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
433{
434}
435
436void IMergedBlockOutputStream::ColumnStream::finalize()
437{
438 compressed.next();
439 plain_file->next();
440 marks.next();
441}
442
443void IMergedBlockOutputStream::ColumnStream::sync()
444{
445 plain_file->sync();
446 marks_file.sync();
447}
448
449void IMergedBlockOutputStream::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
450{
451 String name = escaped_column_name;
452
453 checksums.files[name + data_file_extension].is_compressed = true;
454 checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
455 checksums.files[name + data_file_extension].uncompressed_hash = compressed.getHash();
456 checksums.files[name + data_file_extension].file_size = plain_hashing.count();
457 checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash();
458
459 checksums.files[name + marks_file_extension].file_size = marks.count();
460 checksums.files[name + marks_file_extension].file_hash = marks.getHash();
461}
462
463}
464