| 1 | #include <Storages/MergeTree/MergedColumnOnlyOutputStream.h> | 
|---|
| 2 |  | 
|---|
| 3 | namespace DB | 
|---|
| 4 | { | 
|---|
| 5 |  | 
|---|
| 6 | MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( | 
|---|
| 7 | MergeTreeData & storage_, const Block & , const String & part_path_, bool sync_, | 
|---|
| 8 | CompressionCodecPtr default_codec_, bool skip_offsets_, | 
|---|
| 9 | const std::vector<MergeTreeIndexPtr> & indices_to_recalc_, | 
|---|
| 10 | WrittenOffsetColumns & already_written_offset_columns_, | 
|---|
| 11 | const MergeTreeIndexGranularity & index_granularity_, | 
|---|
| 12 | const MergeTreeIndexGranularityInfo * index_granularity_info_) | 
|---|
| 13 | : IMergedBlockOutputStream( | 
|---|
| 14 | storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, | 
|---|
| 15 | storage_.global_context.getSettings().max_compress_block_size, default_codec_, | 
|---|
| 16 | storage_.global_context.getSettings().min_bytes_to_use_direct_io, | 
|---|
| 17 | false, | 
|---|
| 18 | indices_to_recalc_, | 
|---|
| 19 | index_granularity_, | 
|---|
| 20 | index_granularity_info_), | 
|---|
| 21 | header(header_), sync(sync_), skip_offsets(skip_offsets_), | 
|---|
| 22 | already_written_offset_columns(already_written_offset_columns_) | 
|---|
| 23 | { | 
|---|
| 24 | serialization_states.reserve(header.columns()); | 
|---|
| 25 | WrittenOffsetColumns tmp_offset_columns; | 
|---|
| 26 | IDataType::SerializeBinaryBulkSettings settings; | 
|---|
| 27 |  | 
|---|
| 28 | for (const auto & column_name : header.getNames()) | 
|---|
| 29 | { | 
|---|
| 30 | const auto & col = header.getByName(column_name); | 
|---|
| 31 |  | 
|---|
| 32 | const auto columns = storage.getColumns(); | 
|---|
| 33 | addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets); | 
|---|
| 34 | serialization_states.emplace_back(nullptr); | 
|---|
| 35 | settings.getter = createStreamGetter(col.name, tmp_offset_columns, false); | 
|---|
| 36 | col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back()); | 
|---|
| 37 | } | 
|---|
| 38 |  | 
|---|
| 39 | initSkipIndices(); | 
|---|
| 40 | } | 
|---|
| 41 |  | 
|---|
| 42 | void MergedColumnOnlyOutputStream::write(const Block & block) | 
|---|
| 43 | { | 
|---|
| 44 | std::set<String> skip_indexes_column_names_set; | 
|---|
| 45 | for (const auto & index : skip_indices) | 
|---|
| 46 | std::copy(index->columns.cbegin(), index->columns.cend(), | 
|---|
| 47 | std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end())); | 
|---|
| 48 | Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); | 
|---|
| 49 |  | 
|---|
| 50 | std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size()); | 
|---|
| 51 | std::map<String, size_t> skip_indexes_column_name_to_position; | 
|---|
| 52 | for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i) | 
|---|
| 53 | { | 
|---|
| 54 | const auto & name = skip_indexes_column_names[i]; | 
|---|
| 55 | skip_indexes_column_name_to_position.emplace(name, i); | 
|---|
| 56 | skip_indexes_columns[i] = block.getByName(name); | 
|---|
| 57 | } | 
|---|
| 58 |  | 
|---|
| 59 | size_t rows = block.rows(); | 
|---|
| 60 | if (!rows) | 
|---|
| 61 | return; | 
|---|
| 62 |  | 
|---|
| 63 | size_t new_index_offset = 0; | 
|---|
| 64 | size_t new_current_mark = 0; | 
|---|
| 65 | WrittenOffsetColumns offset_columns = already_written_offset_columns; | 
|---|
| 66 | for (size_t i = 0; i < header.columns(); ++i) | 
|---|
| 67 | { | 
|---|
| 68 | const ColumnWithTypeAndName & column = block.getByName(header.getByPosition(i).name); | 
|---|
| 69 | std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark); | 
|---|
| 70 | } | 
|---|
| 71 |  | 
|---|
| 72 | /// Should be written before index offset update, because we calculate, | 
|---|
| 73 | /// indices of currently written granules | 
|---|
| 74 | calculateAndSerializeSkipIndices(skip_indexes_columns, rows); | 
|---|
| 75 |  | 
|---|
| 76 | index_offset = new_index_offset; | 
|---|
| 77 | current_mark = new_current_mark; | 
|---|
| 78 | } | 
|---|
| 79 |  | 
|---|
| 80 | void MergedColumnOnlyOutputStream::writeSuffix() | 
|---|
| 81 | { | 
|---|
| 82 | throw Exception( "Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED); | 
|---|
| 83 | } | 
|---|
| 84 |  | 
|---|
| 85 | MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums() | 
|---|
| 86 | { | 
|---|
| 87 | /// Finish columns serialization. | 
|---|
| 88 | auto & settings = storage.global_context.getSettingsRef(); | 
|---|
| 89 | IDataType::SerializeBinaryBulkSettings serialize_settings; | 
|---|
| 90 | serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; | 
|---|
| 91 | serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; | 
|---|
| 92 |  | 
|---|
| 93 | WrittenOffsetColumns offset_columns; | 
|---|
| 94 | for (size_t i = 0, size = header.columns(); i < size; ++i) | 
|---|
| 95 | { | 
|---|
| 96 | auto & column = header.getByPosition(i); | 
|---|
| 97 | serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets); | 
|---|
| 98 | column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); | 
|---|
| 99 |  | 
|---|
| 100 | /// We wrote at least one row | 
|---|
| 101 | if (with_final_mark && (index_offset != 0 || current_mark != 0)) | 
|---|
| 102 | writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path); | 
|---|
| 103 | } | 
|---|
| 104 |  | 
|---|
| 105 | MergeTreeData::DataPart::Checksums checksums; | 
|---|
| 106 |  | 
|---|
| 107 | for (auto & column_stream : column_streams) | 
|---|
| 108 | { | 
|---|
| 109 | column_stream.second->finalize(); | 
|---|
| 110 | if (sync) | 
|---|
| 111 | column_stream.second->sync(); | 
|---|
| 112 |  | 
|---|
| 113 | column_stream.second->addToChecksums(checksums); | 
|---|
| 114 | } | 
|---|
| 115 |  | 
|---|
| 116 | finishSkipIndicesSerialization(checksums); | 
|---|
| 117 |  | 
|---|
| 118 | column_streams.clear(); | 
|---|
| 119 | serialization_states.clear(); | 
|---|
| 120 |  | 
|---|
| 121 | return checksums; | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | } | 
|---|
| 125 |  | 
|---|