1 | #pragma once |
2 | |
3 | #include <Storages/MergeTree/MergeTreeIndexGranularity.h> |
4 | #include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h> |
5 | #include <IO/WriteBufferFromFile.h> |
6 | #include <Compression/CompressedWriteBuffer.h> |
7 | #include <IO/HashingWriteBuffer.h> |
8 | #include <Storages/MergeTree/MergeTreeData.h> |
9 | #include <DataStreams/IBlockOutputStream.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | |
15 | class IMergedBlockOutputStream : public IBlockOutputStream |
16 | { |
17 | public: |
18 | IMergedBlockOutputStream( |
19 | MergeTreeData & storage_, |
20 | const String & part_path_, |
21 | size_t min_compress_block_size_, |
22 | size_t max_compress_block_size_, |
23 | CompressionCodecPtr default_codec_, |
24 | size_t aio_threshold_, |
25 | bool blocks_are_granules_size_, |
26 | const std::vector<MergeTreeIndexPtr> & indices_to_recalc, |
27 | const MergeTreeIndexGranularity & index_granularity_, |
28 | const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); |
29 | |
30 | using WrittenOffsetColumns = std::set<std::string>; |
31 | |
32 | protected: |
33 | using SerializationState = IDataType::SerializeBinaryBulkStatePtr; |
34 | using SerializationStates = std::vector<SerializationState>; |
35 | |
36 | struct ColumnStream |
37 | { |
38 | ColumnStream( |
39 | const String & escaped_column_name_, |
40 | const String & data_path_, |
41 | const std::string & data_file_extension_, |
42 | const std::string & marks_path_, |
43 | const std::string & marks_file_extension_, |
44 | const CompressionCodecPtr & compression_codec_, |
45 | size_t max_compress_block_size_, |
46 | size_t estimated_size_, |
47 | size_t aio_threshold_); |
48 | |
49 | String escaped_column_name; |
50 | std::string data_file_extension; |
51 | std::string marks_file_extension; |
52 | |
53 | /// compressed -> compressed_buf -> plain_hashing -> plain_file |
54 | std::unique_ptr<WriteBufferFromFileBase> plain_file; |
55 | HashingWriteBuffer plain_hashing; |
56 | CompressedWriteBuffer compressed_buf; |
57 | HashingWriteBuffer compressed; |
58 | |
59 | /// marks -> marks_file |
60 | WriteBufferFromFile marks_file; |
61 | HashingWriteBuffer marks; |
62 | |
63 | void finalize(); |
64 | |
65 | void sync(); |
66 | |
67 | void addToChecksums(MergeTreeData::DataPart::Checksums & checksums); |
68 | }; |
69 | |
70 | using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>; |
71 | |
72 | void addStreams(const String & path, const String & name, const IDataType & type, |
73 | const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets); |
74 | |
75 | |
76 | IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); |
77 | |
78 | /// Write data of one column. |
79 | /// Return how many marks were written and |
80 | /// how many rows were written for last mark |
81 | std::pair<size_t, size_t> writeColumn( |
82 | const String & name, |
83 | const IDataType & type, |
84 | const IColumn & column, |
85 | WrittenOffsetColumns & offset_columns, |
86 | bool skip_offsets, |
87 | IDataType::SerializeBinaryBulkStatePtr & serialization_state, |
88 | size_t from_mark |
89 | ); |
90 | |
91 | /// Write single granule of one column (rows between 2 marks) |
92 | size_t writeSingleGranule( |
93 | const String & name, |
94 | const IDataType & type, |
95 | const IColumn & column, |
96 | WrittenOffsetColumns & offset_columns, |
97 | bool skip_offsets, |
98 | IDataType::SerializeBinaryBulkStatePtr & serialization_state, |
99 | IDataType::SerializeBinaryBulkSettings & serialize_settings, |
100 | size_t from_row, |
101 | size_t number_of_rows, |
102 | bool write_marks); |
103 | |
104 | /// Write mark for column |
105 | void writeSingleMark( |
106 | const String & name, |
107 | const IDataType & type, |
108 | WrittenOffsetColumns & offset_columns, |
109 | bool skip_offsets, |
110 | size_t number_of_rows, |
111 | DB::IDataType::SubstreamPath & path); |
112 | |
113 | /// Count index_granularity for block and store in `index_granularity` |
114 | void fillIndexGranularity(const Block & block); |
115 | |
116 | /// Write final mark to the end of column |
117 | void writeFinalMark( |
118 | const std::string & column_name, |
119 | const DataTypePtr column_type, |
120 | WrittenOffsetColumns & offset_columns, |
121 | bool skip_offsets, |
122 | DB::IDataType::SubstreamPath & path); |
123 | |
124 | void initSkipIndices(); |
125 | void calculateAndSerializeSkipIndices(const ColumnsWithTypeAndName & skip_indexes_columns, size_t rows); |
126 | void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums); |
127 | protected: |
128 | MergeTreeData & storage; |
129 | |
130 | SerializationStates serialization_states; |
131 | String part_path; |
132 | |
133 | ColumnStreams column_streams; |
134 | |
135 | /// The offset to the first row of the block for which you want to write the index. |
136 | size_t index_offset = 0; |
137 | |
138 | size_t min_compress_block_size; |
139 | size_t max_compress_block_size; |
140 | |
141 | size_t aio_threshold; |
142 | |
143 | size_t current_mark = 0; |
144 | |
145 | /// Number of mark in data from which skip indices have to start |
146 | /// aggregation. I.e. it's data mark number, not skip indices mark. |
147 | size_t skip_index_data_mark = 0; |
148 | |
149 | const bool can_use_adaptive_granularity; |
150 | const std::string marks_file_extension; |
151 | const bool blocks_are_granules_size; |
152 | |
153 | MergeTreeIndexGranularity index_granularity; |
154 | |
155 | const bool compute_granularity; |
156 | CompressionCodecPtr codec; |
157 | |
158 | std::vector<MergeTreeIndexPtr> skip_indices; |
159 | std::vector<std::unique_ptr<ColumnStream>> skip_indices_streams; |
160 | MergeTreeIndexAggregators skip_indices_aggregators; |
161 | std::vector<size_t> skip_index_filling; |
162 | |
163 | const bool with_final_mark; |
164 | }; |
165 | |
166 | } |
167 | |