| 1 | #pragma once |
| 2 | |
| 3 | #include <Storages/MergeTree/IMergedBlockOutputStream.h> |
| 4 | #include <Columns/ColumnArray.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | /** To write one part. |
| 11 | * The data refers to one partition, and is written in one part. |
| 12 | */ |
| 13 | class MergedBlockOutputStream final : public IMergedBlockOutputStream |
| 14 | { |
| 15 | public: |
| 16 | MergedBlockOutputStream( |
| 17 | MergeTreeData & storage_, |
| 18 | const String & part_path_, |
| 19 | const NamesAndTypesList & columns_list_, |
| 20 | CompressionCodecPtr default_codec_, |
| 21 | bool blocks_are_granules_size_ = false); |
| 22 | |
| 23 | MergedBlockOutputStream( |
| 24 | MergeTreeData & storage_, |
| 25 | const String & part_path_, |
| 26 | const NamesAndTypesList & columns_list_, |
| 27 | CompressionCodecPtr default_codec_, |
| 28 | const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, |
| 29 | size_t aio_threshold_, |
| 30 | bool blocks_are_granules_size_ = false); |
| 31 | |
| 32 | std::string getPartPath() const; |
| 33 | |
| 34 | Block () const override { return storage.getSampleBlock(); } |
| 35 | |
| 36 | /// If the data is pre-sorted. |
| 37 | void write(const Block & block) override; |
| 38 | |
| 39 | /** If the data is not sorted, but we have previously calculated the permutation, that will sort it. |
| 40 | * This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one. |
| 41 | */ |
| 42 | void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); |
| 43 | |
| 44 | void writeSuffix() override; |
| 45 | |
| 46 | /// Finilize writing part and fill inner structures |
| 47 | void writeSuffixAndFinalizePart( |
| 48 | MergeTreeData::MutableDataPartPtr & new_part, |
| 49 | const NamesAndTypesList * total_columns_list = nullptr, |
| 50 | MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); |
| 51 | |
| 52 | const MergeTreeIndexGranularity & getIndexGranularity() const |
| 53 | { |
| 54 | return index_granularity; |
| 55 | } |
| 56 | |
| 57 | private: |
| 58 | void init(); |
| 59 | |
| 60 | /** If `permutation` is given, it rearranges the values in the columns when writing. |
| 61 | * This is necessary to not keep the whole block in the RAM to sort it. |
| 62 | */ |
| 63 | void writeImpl(const Block & block, const IColumn::Permutation * permutation); |
| 64 | |
| 65 | private: |
| 66 | NamesAndTypesList columns_list; |
| 67 | |
| 68 | size_t rows_count = 0; |
| 69 | |
| 70 | std::unique_ptr<WriteBufferFromFile> index_file_stream; |
| 71 | std::unique_ptr<HashingWriteBuffer> index_stream; |
| 72 | MutableColumns index_columns; |
| 73 | /// Index columns values from the last row from the last block |
| 74 | /// It's written to index file in the `writeSuffixAndFinalizePart` method |
| 75 | ColumnsWithTypeAndName last_index_row; |
| 76 | }; |
| 77 | |
| 78 | } |
| 79 | |