1 | #include <Storages/MergeTree/MergedColumnOnlyOutputStream.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( |
7 | MergeTreeData & storage_, const Block & , const String & part_path_, bool sync_, |
8 | CompressionCodecPtr default_codec_, bool skip_offsets_, |
9 | const std::vector<MergeTreeIndexPtr> & indices_to_recalc_, |
10 | WrittenOffsetColumns & already_written_offset_columns_, |
11 | const MergeTreeIndexGranularity & index_granularity_, |
12 | const MergeTreeIndexGranularityInfo * index_granularity_info_) |
13 | : IMergedBlockOutputStream( |
14 | storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, |
15 | storage_.global_context.getSettings().max_compress_block_size, default_codec_, |
16 | storage_.global_context.getSettings().min_bytes_to_use_direct_io, |
17 | false, |
18 | indices_to_recalc_, |
19 | index_granularity_, |
20 | index_granularity_info_), |
21 | header(header_), sync(sync_), skip_offsets(skip_offsets_), |
22 | already_written_offset_columns(already_written_offset_columns_) |
23 | { |
24 | serialization_states.reserve(header.columns()); |
25 | WrittenOffsetColumns tmp_offset_columns; |
26 | IDataType::SerializeBinaryBulkSettings settings; |
27 | |
28 | for (const auto & column_name : header.getNames()) |
29 | { |
30 | const auto & col = header.getByName(column_name); |
31 | |
32 | const auto columns = storage.getColumns(); |
33 | addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets); |
34 | serialization_states.emplace_back(nullptr); |
35 | settings.getter = createStreamGetter(col.name, tmp_offset_columns, false); |
36 | col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back()); |
37 | } |
38 | |
39 | initSkipIndices(); |
40 | } |
41 | |
42 | void MergedColumnOnlyOutputStream::write(const Block & block) |
43 | { |
44 | std::set<String> skip_indexes_column_names_set; |
45 | for (const auto & index : skip_indices) |
46 | std::copy(index->columns.cbegin(), index->columns.cend(), |
47 | std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end())); |
48 | Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); |
49 | |
50 | std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size()); |
51 | std::map<String, size_t> skip_indexes_column_name_to_position; |
52 | for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i) |
53 | { |
54 | const auto & name = skip_indexes_column_names[i]; |
55 | skip_indexes_column_name_to_position.emplace(name, i); |
56 | skip_indexes_columns[i] = block.getByName(name); |
57 | } |
58 | |
59 | size_t rows = block.rows(); |
60 | if (!rows) |
61 | return; |
62 | |
63 | size_t new_index_offset = 0; |
64 | size_t new_current_mark = 0; |
65 | WrittenOffsetColumns offset_columns = already_written_offset_columns; |
66 | for (size_t i = 0; i < header.columns(); ++i) |
67 | { |
68 | const ColumnWithTypeAndName & column = block.getByName(header.getByPosition(i).name); |
69 | std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark); |
70 | } |
71 | |
72 | /// Should be written before index offset update, because we calculate, |
73 | /// indices of currently written granules |
74 | calculateAndSerializeSkipIndices(skip_indexes_columns, rows); |
75 | |
76 | index_offset = new_index_offset; |
77 | current_mark = new_current_mark; |
78 | } |
79 | |
80 | void MergedColumnOnlyOutputStream::writeSuffix() |
81 | { |
82 | throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream" , ErrorCodes::NOT_IMPLEMENTED); |
83 | } |
84 | |
85 | MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums() |
86 | { |
87 | /// Finish columns serialization. |
88 | auto & settings = storage.global_context.getSettingsRef(); |
89 | IDataType::SerializeBinaryBulkSettings serialize_settings; |
90 | serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; |
91 | serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; |
92 | |
93 | WrittenOffsetColumns offset_columns; |
94 | for (size_t i = 0, size = header.columns(); i < size; ++i) |
95 | { |
96 | auto & column = header.getByPosition(i); |
97 | serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets); |
98 | column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); |
99 | |
100 | /// We wrote at least one row |
101 | if (with_final_mark && (index_offset != 0 || current_mark != 0)) |
102 | writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path); |
103 | } |
104 | |
105 | MergeTreeData::DataPart::Checksums checksums; |
106 | |
107 | for (auto & column_stream : column_streams) |
108 | { |
109 | column_stream.second->finalize(); |
110 | if (sync) |
111 | column_stream.second->sync(); |
112 | |
113 | column_stream.second->addToChecksums(checksums); |
114 | } |
115 | |
116 | finishSkipIndicesSerialization(checksums); |
117 | |
118 | column_streams.clear(); |
119 | serialization_states.clear(); |
120 | |
121 | return checksums; |
122 | } |
123 | |
124 | } |
125 | |