| 1 | #include <Storages/MergeTree/MergedColumnOnlyOutputStream.h> |
| 2 | |
| 3 | namespace DB |
| 4 | { |
| 5 | |
| 6 | MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( |
| 7 | MergeTreeData & storage_, const Block & , const String & part_path_, bool sync_, |
| 8 | CompressionCodecPtr default_codec_, bool skip_offsets_, |
| 9 | const std::vector<MergeTreeIndexPtr> & indices_to_recalc_, |
| 10 | WrittenOffsetColumns & already_written_offset_columns_, |
| 11 | const MergeTreeIndexGranularity & index_granularity_, |
| 12 | const MergeTreeIndexGranularityInfo * index_granularity_info_) |
| 13 | : IMergedBlockOutputStream( |
| 14 | storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, |
| 15 | storage_.global_context.getSettings().max_compress_block_size, default_codec_, |
| 16 | storage_.global_context.getSettings().min_bytes_to_use_direct_io, |
| 17 | false, |
| 18 | indices_to_recalc_, |
| 19 | index_granularity_, |
| 20 | index_granularity_info_), |
| 21 | header(header_), sync(sync_), skip_offsets(skip_offsets_), |
| 22 | already_written_offset_columns(already_written_offset_columns_) |
| 23 | { |
| 24 | serialization_states.reserve(header.columns()); |
| 25 | WrittenOffsetColumns tmp_offset_columns; |
| 26 | IDataType::SerializeBinaryBulkSettings settings; |
| 27 | |
| 28 | for (const auto & column_name : header.getNames()) |
| 29 | { |
| 30 | const auto & col = header.getByName(column_name); |
| 31 | |
| 32 | const auto columns = storage.getColumns(); |
| 33 | addStreams(part_path, col.name, *col.type, columns.getCodecOrDefault(col.name, codec), 0, skip_offsets); |
| 34 | serialization_states.emplace_back(nullptr); |
| 35 | settings.getter = createStreamGetter(col.name, tmp_offset_columns, false); |
| 36 | col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back()); |
| 37 | } |
| 38 | |
| 39 | initSkipIndices(); |
| 40 | } |
| 41 | |
| 42 | void MergedColumnOnlyOutputStream::write(const Block & block) |
| 43 | { |
| 44 | std::set<String> skip_indexes_column_names_set; |
| 45 | for (const auto & index : skip_indices) |
| 46 | std::copy(index->columns.cbegin(), index->columns.cend(), |
| 47 | std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end())); |
| 48 | Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); |
| 49 | |
| 50 | std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size()); |
| 51 | std::map<String, size_t> skip_indexes_column_name_to_position; |
| 52 | for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i) |
| 53 | { |
| 54 | const auto & name = skip_indexes_column_names[i]; |
| 55 | skip_indexes_column_name_to_position.emplace(name, i); |
| 56 | skip_indexes_columns[i] = block.getByName(name); |
| 57 | } |
| 58 | |
| 59 | size_t rows = block.rows(); |
| 60 | if (!rows) |
| 61 | return; |
| 62 | |
| 63 | size_t new_index_offset = 0; |
| 64 | size_t new_current_mark = 0; |
| 65 | WrittenOffsetColumns offset_columns = already_written_offset_columns; |
| 66 | for (size_t i = 0; i < header.columns(); ++i) |
| 67 | { |
| 68 | const ColumnWithTypeAndName & column = block.getByName(header.getByPosition(i).name); |
| 69 | std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark); |
| 70 | } |
| 71 | |
| 72 | /// Should be written before index offset update, because we calculate, |
| 73 | /// indices of currently written granules |
| 74 | calculateAndSerializeSkipIndices(skip_indexes_columns, rows); |
| 75 | |
| 76 | index_offset = new_index_offset; |
| 77 | current_mark = new_current_mark; |
| 78 | } |
| 79 | |
| 80 | void MergedColumnOnlyOutputStream::writeSuffix() |
| 81 | { |
| 82 | throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream" , ErrorCodes::NOT_IMPLEMENTED); |
| 83 | } |
| 84 | |
| 85 | MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums() |
| 86 | { |
| 87 | /// Finish columns serialization. |
| 88 | auto & settings = storage.global_context.getSettingsRef(); |
| 89 | IDataType::SerializeBinaryBulkSettings serialize_settings; |
| 90 | serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; |
| 91 | serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; |
| 92 | |
| 93 | WrittenOffsetColumns offset_columns; |
| 94 | for (size_t i = 0, size = header.columns(); i < size; ++i) |
| 95 | { |
| 96 | auto & column = header.getByPosition(i); |
| 97 | serialize_settings.getter = createStreamGetter(column.name, already_written_offset_columns, skip_offsets); |
| 98 | column.type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); |
| 99 | |
| 100 | /// We wrote at least one row |
| 101 | if (with_final_mark && (index_offset != 0 || current_mark != 0)) |
| 102 | writeFinalMark(column.name, column.type, offset_columns, skip_offsets, serialize_settings.path); |
| 103 | } |
| 104 | |
| 105 | MergeTreeData::DataPart::Checksums checksums; |
| 106 | |
| 107 | for (auto & column_stream : column_streams) |
| 108 | { |
| 109 | column_stream.second->finalize(); |
| 110 | if (sync) |
| 111 | column_stream.second->sync(); |
| 112 | |
| 113 | column_stream.second->addToChecksums(checksums); |
| 114 | } |
| 115 | |
| 116 | finishSkipIndicesSerialization(checksums); |
| 117 | |
| 118 | column_streams.clear(); |
| 119 | serialization_states.clear(); |
| 120 | |
| 121 | return checksums; |
| 122 | } |
| 123 | |
| 124 | } |
| 125 | |