| 1 | #include <Storages/MergeTree/MergedBlockOutputStream.h> | 
| 2 | #include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h> | 
| 3 | #include <IO/createWriteBufferFromFileBase.h> | 
| 4 | #include <Common/escapeForFileName.h> | 
| 5 | #include <DataTypes/NestedUtils.h> | 
| 6 | #include <DataStreams/MarkInCompressedFile.h> | 
| 7 | #include <Common/StringUtils/StringUtils.h> | 
| 8 | #include <Common/typeid_cast.h> | 
| 9 | #include <Common/MemoryTracker.h> | 
| 10 | #include <Poco/File.h> | 
| 11 |  | 
| 12 |  | 
| 13 | namespace DB | 
| 14 | { | 
| 15 |  | 
| 16 | namespace ErrorCodes | 
| 17 | { | 
| 18 |     extern const int BAD_ARGUMENTS; | 
| 19 | } | 
| 20 |  | 
| 21 |  | 
| 22 | MergedBlockOutputStream::MergedBlockOutputStream( | 
| 23 |     MergeTreeData & storage_, | 
| 24 |     const String & part_path_, | 
| 25 |     const NamesAndTypesList & columns_list_, | 
| 26 |     CompressionCodecPtr default_codec_, | 
| 27 |     bool blocks_are_granules_size_) | 
| 28 |     : IMergedBlockOutputStream( | 
| 29 |         storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, | 
| 30 |         storage_.global_context.getSettings().max_compress_block_size, default_codec_, | 
| 31 |         storage_.global_context.getSettings().min_bytes_to_use_direct_io, | 
| 32 |         blocks_are_granules_size_, | 
| 33 |         std::vector<MergeTreeIndexPtr>(std::begin(storage_.skip_indices), std::end(storage_.skip_indices)), | 
| 34 |         {}) | 
| 35 |     , columns_list(columns_list_) | 
| 36 | { | 
| 37 |     init(); | 
| 38 |     for (const auto & it : columns_list) | 
| 39 |     { | 
| 40 |         const auto columns = storage.getColumns(); | 
| 41 |         addStreams(part_path, it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec_), 0, false); | 
| 42 |     } | 
| 43 | } | 
| 44 |  | 
| 45 | MergedBlockOutputStream::MergedBlockOutputStream( | 
| 46 |     MergeTreeData & storage_, | 
| 47 |     const String & part_path_, | 
| 48 |     const NamesAndTypesList & columns_list_, | 
| 49 |     CompressionCodecPtr default_codec_, | 
| 50 |     const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, | 
| 51 |     size_t aio_threshold_, | 
| 52 |     bool blocks_are_granules_size_) | 
| 53 |     : IMergedBlockOutputStream( | 
| 54 |         storage_, part_path_, storage_.global_context.getSettings().min_compress_block_size, | 
| 55 |         storage_.global_context.getSettings().max_compress_block_size, default_codec_, | 
| 56 |         aio_threshold_, blocks_are_granules_size_, | 
| 57 |         std::vector<MergeTreeIndexPtr>(std::begin(storage_.skip_indices), std::end(storage_.skip_indices)), {}) | 
| 58 |     , columns_list(columns_list_) | 
| 59 | { | 
| 60 |     init(); | 
| 61 |  | 
| 62 |     /// If summary size is more than threshold than we will use AIO | 
| 63 |     size_t total_size = 0; | 
| 64 |     if (aio_threshold > 0) | 
| 65 |     { | 
| 66 |         for (const auto & it : columns_list) | 
| 67 |         { | 
| 68 |             auto it2 = merged_column_to_size_.find(it.name); | 
| 69 |             if (it2 != merged_column_to_size_.end()) | 
| 70 |                 total_size += it2->second; | 
| 71 |         } | 
| 72 |     } | 
| 73 |  | 
| 74 |     for (const auto & it : columns_list) | 
| 75 |     { | 
| 76 |         const auto columns = storage.getColumns(); | 
| 77 |         addStreams(part_path, it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec_), total_size, false); | 
| 78 |     } | 
| 79 | } | 
| 80 |  | 
| 81 | std::string MergedBlockOutputStream::getPartPath() const | 
| 82 | { | 
| 83 |     return part_path; | 
| 84 | } | 
| 85 |  | 
| 86 | /// If data is pre-sorted. | 
| 87 | void MergedBlockOutputStream::write(const Block & block) | 
| 88 | { | 
| 89 |     writeImpl(block, nullptr); | 
| 90 | } | 
| 91 |  | 
| 92 | /** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted. | 
| 93 |     * This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted. | 
| 94 |     */ | 
| 95 | void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation) | 
| 96 | { | 
| 97 |     writeImpl(block, permutation); | 
| 98 | } | 
| 99 |  | 
| 100 | void MergedBlockOutputStream::writeSuffix() | 
| 101 | { | 
| 102 |     throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream" , ErrorCodes::NOT_IMPLEMENTED); | 
| 103 | } | 
| 104 |  | 
| 105 | void MergedBlockOutputStream::writeSuffixAndFinalizePart( | 
| 106 |         MergeTreeData::MutableDataPartPtr & new_part, | 
| 107 |         const NamesAndTypesList * total_column_list, | 
| 108 |         MergeTreeData::DataPart::Checksums * additional_column_checksums) | 
| 109 | { | 
| 110 |     /// Finish columns serialization. | 
| 111 |     { | 
| 112 |         auto & settings = storage.global_context.getSettingsRef(); | 
| 113 |         IDataType::SerializeBinaryBulkSettings serialize_settings; | 
| 114 |         serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; | 
| 115 |         serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; | 
| 116 |         WrittenOffsetColumns offset_columns; | 
| 117 |         auto it = columns_list.begin(); | 
| 118 |         for (size_t i = 0; i < columns_list.size(); ++i, ++it) | 
| 119 |         { | 
| 120 |             if (!serialization_states.empty()) | 
| 121 |             { | 
| 122 |                 serialize_settings.getter = createStreamGetter(it->name, offset_columns, false); | 
| 123 |                 it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]); | 
| 124 |             } | 
| 125 |  | 
| 126 |             if (with_final_mark && rows_count != 0) | 
| 127 |                 writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path); | 
| 128 |         } | 
| 129 |     } | 
| 130 |  | 
| 131 |     if (with_final_mark && rows_count != 0) | 
| 132 |         index_granularity.appendMark(0); /// last mark | 
| 133 |  | 
| 134 |     if (!total_column_list) | 
| 135 |         total_column_list = &columns_list; | 
| 136 |  | 
| 137 |     /// Finish write and get checksums. | 
| 138 |     MergeTreeData::DataPart::Checksums checksums; | 
| 139 |  | 
| 140 |     if (additional_column_checksums) | 
| 141 |         checksums = std::move(*additional_column_checksums); | 
| 142 |  | 
| 143 |     if (index_stream) | 
| 144 |     { | 
| 145 |         if (with_final_mark && rows_count != 0) | 
| 146 |         { | 
| 147 |             for (size_t j = 0; j < index_columns.size(); ++j) | 
| 148 |             { | 
| 149 |                 auto & column = *last_index_row[j].column; | 
| 150 |                 index_columns[j]->insertFrom(column, 0); /// it has only one element | 
| 151 |                 last_index_row[j].type->serializeBinary(column, 0, *index_stream); | 
| 152 |             } | 
| 153 |             last_index_row.clear(); | 
| 154 |         } | 
| 155 |  | 
| 156 |         index_stream->next(); | 
| 157 |         checksums.files["primary.idx" ].file_size = index_stream->count(); | 
| 158 |         checksums.files["primary.idx" ].file_hash = index_stream->getHash(); | 
| 159 |         index_stream = nullptr; | 
| 160 |     } | 
| 161 |  | 
| 162 |     for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) | 
| 163 |     { | 
| 164 |         it->second->finalize(); | 
| 165 |         it->second->addToChecksums(checksums); | 
| 166 |     } | 
| 167 |  | 
| 168 |     finishSkipIndicesSerialization(checksums); | 
| 169 |  | 
| 170 |     column_streams.clear(); | 
| 171 |  | 
| 172 |     if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) | 
| 173 |     { | 
| 174 |         new_part->partition.store(storage, part_path, checksums); | 
| 175 |         if (new_part->minmax_idx.initialized) | 
| 176 |             new_part->minmax_idx.store(storage, part_path, checksums); | 
| 177 |         else if (rows_count) | 
| 178 |             throw Exception("MinMax index was not initialized for new non-empty part "  + new_part->name | 
| 179 |                 + ". It is a bug." , ErrorCodes::LOGICAL_ERROR); | 
| 180 |  | 
| 181 |         WriteBufferFromFile count_out(part_path + "count.txt" , 4096); | 
| 182 |         HashingWriteBuffer count_out_hashing(count_out); | 
| 183 |         writeIntText(rows_count, count_out_hashing); | 
| 184 |         count_out_hashing.next(); | 
| 185 |         checksums.files["count.txt" ].file_size = count_out_hashing.count(); | 
| 186 |         checksums.files["count.txt" ].file_hash = count_out_hashing.getHash(); | 
| 187 |     } | 
| 188 |  | 
| 189 |     if (!new_part->ttl_infos.empty()) | 
| 190 |     { | 
| 191 |         /// Write a file with ttl infos in json format. | 
| 192 |         WriteBufferFromFile out(part_path + "ttl.txt" , 4096); | 
| 193 |         HashingWriteBuffer out_hashing(out); | 
| 194 |         new_part->ttl_infos.write(out_hashing); | 
| 195 |         checksums.files["ttl.txt" ].file_size = out_hashing.count(); | 
| 196 |         checksums.files["ttl.txt" ].file_hash = out_hashing.getHash(); | 
| 197 |     } | 
| 198 |  | 
| 199 |     { | 
| 200 |         /// Write a file with a description of columns. | 
| 201 |         WriteBufferFromFile out(part_path + "columns.txt" , 4096); | 
| 202 |         total_column_list->writeText(out); | 
| 203 |     } | 
| 204 |  | 
| 205 |     { | 
| 206 |         /// Write file with checksums. | 
| 207 |         WriteBufferFromFile out(part_path + "checksums.txt" , 4096); | 
| 208 |         checksums.write(out); | 
| 209 |     } | 
| 210 |  | 
| 211 |     new_part->rows_count = rows_count; | 
| 212 |     new_part->modification_time = time(nullptr); | 
| 213 |     new_part->columns = *total_column_list; | 
| 214 |     new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end())); | 
| 215 |     new_part->checksums = checksums; | 
| 216 |     new_part->bytes_on_disk = checksums.getTotalSizeOnDisk(); | 
| 217 |     new_part->index_granularity = index_granularity; | 
| 218 | } | 
| 219 |  | 
| 220 | void MergedBlockOutputStream::init() | 
| 221 | { | 
| 222 |     Poco::File(part_path).createDirectories(); | 
| 223 |  | 
| 224 |     if (storage.hasPrimaryKey()) | 
| 225 |     { | 
| 226 |         index_file_stream = std::make_unique<WriteBufferFromFile>( | 
| 227 |             part_path + "primary.idx" , DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); | 
| 228 |         index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream); | 
| 229 |     } | 
| 230 |  | 
| 231 |     initSkipIndices(); | 
| 232 | } | 
| 233 |  | 
| 234 |  | 
| 235 | void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation) | 
| 236 | { | 
| 237 |     block.checkNumberOfRows(); | 
| 238 |     size_t rows = block.rows(); | 
| 239 |     if (!rows) | 
| 240 |         return; | 
| 241 |  | 
| 242 |     /// Fill index granularity for this block | 
| 243 |     /// if it's unknown (in case of insert data or horizontal merge, | 
| 244 |     /// but not in case of vertical merge) | 
| 245 |     if (compute_granularity) | 
| 246 |         fillIndexGranularity(block); | 
| 247 |  | 
| 248 |     /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times | 
| 249 |     WrittenOffsetColumns offset_columns; | 
| 250 |  | 
| 251 |     auto primary_key_column_names = storage.primary_key_columns; | 
| 252 |     std::set<String> skip_indexes_column_names_set; | 
| 253 |     for (const auto & index : storage.skip_indices) | 
| 254 |         std::copy(index->columns.cbegin(), index->columns.cend(), | 
| 255 |                 std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end())); | 
| 256 |     Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); | 
| 257 |  | 
| 258 |     /// Here we will add the columns related to the Primary Key, then write the index. | 
| 259 |     std::vector<ColumnWithTypeAndName> primary_key_columns(primary_key_column_names.size()); | 
| 260 |     std::map<String, size_t> primary_key_column_name_to_position; | 
| 261 |  | 
| 262 |     for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i) | 
| 263 |     { | 
| 264 |         const auto & name = primary_key_column_names[i]; | 
| 265 |  | 
| 266 |         if (!primary_key_column_name_to_position.emplace(name, i).second) | 
| 267 |             throw Exception("Primary key contains duplicate columns" , ErrorCodes::BAD_ARGUMENTS); | 
| 268 |  | 
| 269 |         primary_key_columns[i] = block.getByName(name); | 
| 270 |  | 
| 271 |         /// Reorder primary key columns in advance and add them to `primary_key_columns`. | 
| 272 |         if (permutation) | 
| 273 |             primary_key_columns[i].column = primary_key_columns[i].column->permute(*permutation, 0); | 
| 274 |     } | 
| 275 |  | 
| 276 |     /// The same for skip indexes columns | 
| 277 |     std::vector<ColumnWithTypeAndName> skip_indexes_columns(skip_indexes_column_names.size()); | 
| 278 |     std::map<String, size_t> skip_indexes_column_name_to_position; | 
| 279 |  | 
| 280 |     for (size_t i = 0, size = skip_indexes_column_names.size(); i < size; ++i) | 
| 281 |     { | 
| 282 |         const auto & name = skip_indexes_column_names[i]; | 
| 283 |         skip_indexes_column_name_to_position.emplace(name, i); | 
| 284 |         skip_indexes_columns[i] = block.getByName(name); | 
| 285 |  | 
| 286 |         /// Reorder index columns in advance. | 
| 287 |         if (permutation) | 
| 288 |             skip_indexes_columns[i].column = skip_indexes_columns[i].column->permute(*permutation, 0); | 
| 289 |     } | 
| 290 |  | 
| 291 |     if (index_columns.empty()) | 
| 292 |     { | 
| 293 |         index_columns.resize(primary_key_column_names.size()); | 
| 294 |         last_index_row.resize(primary_key_column_names.size()); | 
| 295 |         for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i) | 
| 296 |         { | 
| 297 |             index_columns[i] = primary_key_columns[i].column->cloneEmpty(); | 
| 298 |             last_index_row[i] = primary_key_columns[i].cloneEmpty(); | 
| 299 |         } | 
| 300 |     } | 
| 301 |  | 
| 302 |     if (serialization_states.empty()) | 
| 303 |     { | 
| 304 |         serialization_states.reserve(columns_list.size()); | 
| 305 |         WrittenOffsetColumns tmp_offset_columns; | 
| 306 |         IDataType::SerializeBinaryBulkSettings settings; | 
| 307 |  | 
| 308 |         for (const auto & col : columns_list) | 
| 309 |         { | 
| 310 |             settings.getter = createStreamGetter(col.name, tmp_offset_columns, false); | 
| 311 |             serialization_states.emplace_back(nullptr); | 
| 312 |             col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back()); | 
| 313 |         } | 
| 314 |     } | 
| 315 |  | 
| 316 |     size_t new_index_offset = 0; | 
| 317 |     /// Now write the data. | 
| 318 |     auto it = columns_list.begin(); | 
| 319 |     for (size_t i = 0; i < columns_list.size(); ++i, ++it) | 
| 320 |     { | 
| 321 |         const ColumnWithTypeAndName & column = block.getByName(it->name); | 
| 322 |  | 
| 323 |         if (permutation) | 
| 324 |         { | 
| 325 |             auto primary_column_it = primary_key_column_name_to_position.find(it->name); | 
| 326 |             auto skip_index_column_it = skip_indexes_column_name_to_position.find(it->name); | 
| 327 |             if (primary_key_column_name_to_position.end() != primary_column_it) | 
| 328 |             { | 
| 329 |                 const auto & primary_column = *primary_key_columns[primary_column_it->second].column; | 
| 330 |                 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_mark); | 
| 331 |             } | 
| 332 |             else if (skip_indexes_column_name_to_position.end() != skip_index_column_it) | 
| 333 |             { | 
| 334 |                 const auto & index_column = *skip_indexes_columns[skip_index_column_it->second].column; | 
| 335 |                 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, index_column, offset_columns, false, serialization_states[i], current_mark); | 
| 336 |             } | 
| 337 |             else | 
| 338 |             { | 
| 339 |                 /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM. | 
| 340 |                 ColumnPtr permuted_column = column.column->permute(*permutation, 0); | 
| 341 |                 std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_mark); | 
| 342 |             } | 
| 343 |         } | 
| 344 |         else | 
| 345 |         { | 
| 346 |             std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_mark); | 
| 347 |         } | 
| 348 |     } | 
| 349 |  | 
| 350 |     rows_count += rows; | 
| 351 |  | 
| 352 |     /// Should be written before index offset update, because we calculate, | 
| 353 |     /// indices of currently written granules | 
| 354 |     calculateAndSerializeSkipIndices(skip_indexes_columns, rows); | 
| 355 |  | 
| 356 |     { | 
| 357 |         /** While filling index (index_columns), disable memory tracker. | 
| 358 |           * Because memory is allocated here (maybe in context of INSERT query), | 
| 359 |           *  but then freed in completely different place (while merging parts), where query memory_tracker is not available. | 
| 360 |           * And otherwise it will look like excessively growing memory consumption in context of query. | 
| 361 |           *  (observed in long INSERT SELECTs) | 
| 362 |           */ | 
| 363 |         auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); | 
| 364 |  | 
| 365 |         /// Write index. The index contains Primary Key value for each `index_granularity` row. | 
| 366 |         for (size_t i = index_offset; i < rows;) | 
| 367 |         { | 
| 368 |             if (storage.hasPrimaryKey()) | 
| 369 |             { | 
| 370 |                 for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j) | 
| 371 |                 { | 
| 372 |                     const IColumn & primary_column = *primary_key_columns[j].column.get(); | 
| 373 |                     index_columns[j]->insertFrom(primary_column, i); | 
| 374 |                     primary_key_columns[j].type->serializeBinary(primary_column, i, *index_stream); | 
| 375 |                 } | 
| 376 |             } | 
| 377 |  | 
| 378 |             ++current_mark; | 
| 379 |             if (current_mark < index_granularity.getMarksCount()) | 
| 380 |                 i += index_granularity.getMarkRows(current_mark); | 
| 381 |             else | 
| 382 |                 break; | 
| 383 |         } | 
| 384 |     } | 
| 385 |  | 
| 386 |     /// store last index row to write final mark at the end of column | 
| 387 |     for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j) | 
| 388 |     { | 
| 389 |         const IColumn & primary_column = *primary_key_columns[j].column.get(); | 
| 390 |         auto mutable_column = std::move(*last_index_row[j].column).mutate(); | 
| 391 |         if (!mutable_column->empty()) | 
| 392 |             mutable_column->popBack(1); | 
| 393 |         mutable_column->insertFrom(primary_column, rows - 1); | 
| 394 |         last_index_row[j].column = std::move(mutable_column); | 
| 395 |     } | 
| 396 |  | 
| 397 |     index_offset = new_index_offset; | 
| 398 | } | 
| 399 |  | 
| 400 | } | 
| 401 |  |