1 | #include <Poco/Version.h> |
2 | #include <Processors/Transforms/MergeSortingTransform.h> |
3 | #include <Processors/IAccumulatingTransform.h> |
4 | #include <Processors/Transforms/MergingSortedTransform.h> |
5 | #include <Common/formatReadable.h> |
6 | #include <Common/ProfileEvents.h> |
7 | #include <common/config_common.h> |
8 | #include <IO/WriteBufferFromFile.h> |
9 | #include <Compression/CompressedWriteBuffer.h> |
10 | #include <DataStreams/NativeBlockInputStream.h> |
11 | #include <DataStreams/NativeBlockOutputStream.h> |
12 | |
13 | |
14 | namespace ProfileEvents |
15 | { |
16 | extern const Event ExternalSortWritePart; |
17 | extern const Event ExternalSortMerge; |
18 | } |
19 | |
20 | |
21 | namespace DB |
22 | { |
23 | |
24 | class BufferingToFileTransform : public IAccumulatingTransform |
25 | { |
26 | public: |
27 | BufferingToFileTransform(const Block & , Logger * log_, std::string path_) |
28 | : IAccumulatingTransform(header, header), log(log_) |
29 | , path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out) |
30 | , out_stream(std::make_shared<NativeBlockOutputStream>(compressed_buf_out, 0, header)) |
31 | { |
32 | LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); |
33 | ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); |
34 | out_stream->writePrefix(); |
35 | } |
36 | |
37 | String getName() const override { return "BufferingToFileTransform" ; } |
38 | |
39 | void consume(Chunk chunk) override |
40 | { |
41 | out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); |
42 | } |
43 | |
44 | Chunk generate() override |
45 | { |
46 | if (out_stream) |
47 | { |
48 | out_stream->writeSuffix(); |
49 | compressed_buf_out.next(); |
50 | file_buf_out.next(); |
51 | LOG_INFO(log, "Done writing part of data into temporary file " + path); |
52 | |
53 | out_stream.reset(); |
54 | |
55 | file_in = std::make_unique<ReadBufferFromFile>(path); |
56 | compressed_in = std::make_unique<CompressedReadBuffer>(*file_in); |
57 | block_in = std::make_shared<NativeBlockInputStream>(*compressed_in, getOutputPort().getHeader(), 0); |
58 | } |
59 | |
60 | if (!block_in) |
61 | return {}; |
62 | |
63 | auto block = block_in->read(); |
64 | if (!block) |
65 | { |
66 | block_in->readSuffix(); |
67 | block_in.reset(); |
68 | return {}; |
69 | } |
70 | |
71 | UInt64 num_rows = block.rows(); |
72 | return Chunk(block.getColumns(), num_rows); |
73 | } |
74 | |
75 | private: |
76 | Logger * log; |
77 | std::string path; |
78 | WriteBufferFromFile file_buf_out; |
79 | CompressedWriteBuffer compressed_buf_out; |
80 | BlockOutputStreamPtr out_stream; |
81 | |
82 | std::unique_ptr<ReadBufferFromFile> file_in; |
83 | std::unique_ptr<CompressedReadBuffer> compressed_in; |
84 | BlockInputStreamPtr block_in; |
85 | }; |
86 | |
87 | MergeSortingTransform::MergeSortingTransform( |
88 | const Block & , |
89 | const SortDescription & description_, |
90 | size_t max_merged_block_size_, UInt64 limit_, |
91 | size_t max_bytes_before_remerge_, |
92 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, |
93 | size_t min_free_disk_space_) |
94 | : SortingTransform(header, description_, max_merged_block_size_, limit_) |
95 | , max_bytes_before_remerge(max_bytes_before_remerge_) |
96 | , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) |
97 | , min_free_disk_space(min_free_disk_space_) {} |
98 | |
99 | Processors MergeSortingTransform::expandPipeline() |
100 | { |
101 | if (processors.size() > 1) |
102 | { |
103 | /// Add external_merging_sorted. |
104 | inputs.emplace_back(header_without_constants, this); |
105 | connect(external_merging_sorted->getOutputs().front(), inputs.back()); |
106 | } |
107 | |
108 | auto & buffer = processors.front(); |
109 | |
110 | static_cast<MergingSortedTransform &>(*external_merging_sorted).addInput(); |
111 | connect(buffer->getOutputs().back(), external_merging_sorted->getInputs().back()); |
112 | |
113 | if (!buffer->getInputs().empty()) |
114 | { |
115 | /// Serialize |
116 | outputs.emplace_back(header_without_constants, this); |
117 | connect(getOutputs().back(), buffer->getInputs().back()); |
118 | /// Hack. Say buffer that we need data from port (otherwise it will return PortFull). |
119 | external_merging_sorted->getInputs().back().setNeeded(); |
120 | } |
121 | else |
122 | /// Generate |
123 | static_cast<MergingSortedTransform &>(*external_merging_sorted).setHaveAllInputs(); |
124 | |
125 | return std::move(processors); |
126 | } |
127 | |
128 | void MergeSortingTransform::consume(Chunk chunk) |
129 | { |
130 | /** Algorithm: |
131 | * - read to memory blocks from source stream; |
132 | * - if too many of them and if external sorting is enabled, |
133 | * - merge all blocks to sorted stream and write it to temporary file; |
134 | * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. |
135 | */ |
136 | |
137 | /// If there were only const columns in sort description, then there is no need to sort. |
138 | /// Return the chunk as is. |
139 | if (description.empty()) |
140 | { |
141 | generated_chunk = std::move(chunk); |
142 | return; |
143 | } |
144 | |
145 | removeConstColumns(chunk); |
146 | |
147 | sum_rows_in_blocks += chunk.getNumRows(); |
148 | sum_bytes_in_blocks += chunk.allocatedBytes(); |
149 | chunks.push_back(std::move(chunk)); |
150 | |
151 | /** If significant amount of data was accumulated, perform preliminary merging step. |
152 | */ |
153 | if (chunks.size() > 1 |
154 | && limit |
155 | && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. |
156 | && remerge_is_useful |
157 | && max_bytes_before_remerge |
158 | && sum_bytes_in_blocks > max_bytes_before_remerge) |
159 | { |
160 | remerge(); |
161 | } |
162 | |
163 | /** If too many of them and if external sorting is enabled, |
164 | * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. |
165 | * NOTE. It's possible to check free space in filesystem. |
166 | */ |
167 | if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) |
168 | { |
169 | if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) |
170 | throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); |
171 | |
172 | temporary_files.emplace_back(createTemporaryFile(tmp_path)); |
173 | const std::string & path = temporary_files.back()->path(); |
174 | merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit); |
175 | auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path); |
176 | |
177 | processors.emplace_back(current_processor); |
178 | |
179 | if (!external_merging_sorted) |
180 | { |
181 | bool quiet = false; |
182 | bool have_all_inputs = false; |
183 | |
184 | external_merging_sorted = std::make_shared<MergingSortedTransform>( |
185 | header_without_constants, |
186 | 0, |
187 | description, |
188 | max_merged_block_size, |
189 | limit, |
190 | quiet, |
191 | have_all_inputs); |
192 | |
193 | processors.emplace_back(external_merging_sorted); |
194 | } |
195 | |
196 | stage = Stage::Serialize; |
197 | sum_bytes_in_blocks = 0; |
198 | sum_rows_in_blocks = 0; |
199 | } |
200 | } |
201 | |
202 | void MergeSortingTransform::serialize() |
203 | { |
204 | current_chunk = merge_sorter->read(); |
205 | if (!current_chunk) |
206 | merge_sorter.reset(); |
207 | } |
208 | |
209 | void MergeSortingTransform::generate() |
210 | { |
211 | if (!generated_prefix) |
212 | { |
213 | if (temporary_files.empty()) |
214 | merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit); |
215 | else |
216 | { |
217 | ProfileEvents::increment(ProfileEvents::ExternalSortMerge); |
218 | LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge." ); |
219 | |
220 | if (!chunks.empty()) |
221 | processors.emplace_back(std::make_shared<MergeSorterSource>( |
222 | header_without_constants, std::move(chunks), description, max_merged_block_size, limit)); |
223 | } |
224 | |
225 | generated_prefix = true; |
226 | } |
227 | |
228 | if (merge_sorter) |
229 | { |
230 | generated_chunk = merge_sorter->read(); |
231 | if (!generated_chunk) |
232 | merge_sorter.reset(); |
233 | else |
234 | enrichChunkWithConstants(generated_chunk); |
235 | } |
236 | } |
237 | |
238 | void MergeSortingTransform::remerge() |
239 | { |
240 | LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size() |
241 | << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption" ); |
242 | |
243 | /// NOTE Maybe concat all blocks and partial sort will be faster than merge? |
244 | MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit); |
245 | |
246 | Chunks new_chunks; |
247 | size_t new_sum_rows_in_blocks = 0; |
248 | size_t new_sum_bytes_in_blocks = 0; |
249 | |
250 | while (auto chunk = remerge_sorter.read()) |
251 | { |
252 | new_sum_rows_in_blocks += chunk.getNumRows(); |
253 | new_sum_bytes_in_blocks += chunk.allocatedBytes(); |
254 | new_chunks.emplace_back(std::move(chunk)); |
255 | } |
256 | |
257 | LOG_DEBUG(log, "Memory usage is lowered from " |
258 | << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " |
259 | << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks)); |
260 | |
261 | /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess. |
262 | if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks) |
263 | remerge_is_useful = false; |
264 | |
265 | chunks = std::move(new_chunks); |
266 | sum_rows_in_blocks = new_sum_rows_in_blocks; |
267 | sum_bytes_in_blocks = new_sum_bytes_in_blocks; |
268 | } |
269 | |
270 | } |
271 | |