1#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
2
3namespace DB
4{
5
6MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
7 MergeTreeData & storage_, const Block & header_, const String & part_path_, bool sync_,
8 CompressionCodecPtr default_codec_, bool skip_offsets_,
9 const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
10 WrittenOffsetColumns & already_written_offset_columns_,
11 const MergeTreeIndexGranularity & index_granularity_,
12 const MergeTreeIndexGranularityInfo * index_granularity_info_)
13 : IMergedBlockOutputStream(
14 storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size,
15 storage_.global_context.getSettings().max_compress_block_size, default_codec_,
16 storage_.global_context.getSettings().min_bytes_to_use_direct_io,
17 false,
18 indices_to_recalc_,
19 index_granularity_,
20 index_granularity_info_),
21 header(header_), sync(sync_), skip_offsets(skip_offsets_),
22 already_written_offset_columns(already_written_offset_columns_)
23{
24 serialization_states.reserve(header.columns());
25 WrittenOffsetColumns tmp_offset_columns;
26 IDataType::SerializeBinaryBulkSettings settings;
27
28 for (const auto & column_name : header.getNames())
29 {
30 const auto & col = header.getByName(column_name);
31
32 const auto columns = storage.getColumns();
33 addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets);
34 serialization_states.emplace_back(nullptr);
35 settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
36 col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
37 }
38
39 initSkipIndices();
40}
41
42void MergedColumnOnlyOutputStream::write(const Block & block)
43{
44 std::set<String> skip_indexes_column_names_set;
45 for (const auto & index : skip_indices)
46 std::copy(index->columns.cbegin(), index->columns.cend(),
47 std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
48 Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
49
50 std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size());
51 std::map<String, size_t> skip_indexes_column_name_to_position;
52 for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i)
53 {
54 const auto & name = skip_indexes_column_names[i];
55 skip_indexes_column_name_to_position.emplace(name, i);
56 skip_indexes_columns[i] = block.getByName(name);
57 }
58
59 size_t rows = block.rows();
60 if (!rows)
61 return;
62
63 size_t new_index_offset = 0;
64 size_t new_current_mark = 0;
65 WrittenOffsetColumns offset_columns = already_written_offset_columns;
66 for (size_t i = 0; i < header.columns(); ++i)
67 {
68 const ColumnWithTypeAndName & column = block.getByName(header.getByPosition(i).name);
69 std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark);
70 }
71
72 /// Should be written before index offset update, because we calculate,
73 /// indices of currently written granules
74 calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
75
76 index_offset = new_index_offset;
77 current_mark = new_current_mark;
78}
79
80void MergedColumnOnlyOutputStream::writeSuffix()
81{
82 throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
83}
84
85MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums()
86{
87 /// Finish columns serialization.
88 auto & settings = storage.global_context.getSettingsRef();
89 IDataType::SerializeBinaryBulkSettings serialize_settings;
90 serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
91 serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
92
93 WrittenOffsetColumns offset_columns;
94 for (size_t i = 0, size = header.columns(); i < size; ++i)
95 {
96 auto & column = header.getByPosition(i);
97 serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets);
98 column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
99
100 /// We wrote at least one row
101 if (with_final_mark && (index_offset != 0 || current_mark != 0))
102 writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path);
103 }
104
105 MergeTreeData::DataPart::Checksums checksums;
106
107 for (auto & column_stream : column_streams)
108 {
109 column_stream.second->finalize();
110 if (sync)
111 column_stream.second->sync();
112
113 column_stream.second->addToChecksums(checksums);
114 }
115
116 finishSkipIndicesSerialization(checksums);
117
118 column_streams.clear();
119 serialization_states.clear();
120
121 return checksums;
122}
123
124}
125