1 | #include <DataStreams/MergeSortingBlockInputStream.h> |
2 | #include <DataStreams/MergingSortedBlockInputStream.h> |
3 | #include <DataStreams/NativeBlockOutputStream.h> |
4 | #include <DataStreams/TemporaryFileStream.h> |
5 | #include <DataStreams/processConstants.h> |
6 | #include <Common/formatReadable.h> |
7 | #include <IO/WriteBufferFromFile.h> |
8 | #include <Compression/CompressedWriteBuffer.h> |
9 | #include <Interpreters/sortBlock.h> |
10 | |
11 | |
12 | namespace ProfileEvents |
13 | { |
14 | extern const Event ExternalSortWritePart; |
15 | extern const Event ExternalSortMerge; |
16 | } |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | MergeSortingBlockInputStream::MergeSortingBlockInputStream( |
22 | const BlockInputStreamPtr & input, SortDescription & description_, |
23 | size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, |
24 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_) |
25 | : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), |
26 | max_bytes_before_remerge(max_bytes_before_remerge_), |
27 | max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), |
28 | min_free_disk_space(min_free_disk_space_) |
29 | { |
30 | children.push_back(input); |
31 | header = children.at(0)->getHeader(); |
32 | header_without_constants = header; |
33 | removeConstantsFromBlock(header_without_constants); |
34 | removeConstantsFromSortDescription(header, description); |
35 | } |
36 | |
37 | |
38 | Block MergeSortingBlockInputStream::readImpl() |
39 | { |
40 | /** Algorithm: |
41 | * - read to memory blocks from source stream; |
42 | * - if too many of them and if external sorting is enabled, |
43 | * - merge all blocks to sorted stream and write it to temporary file; |
44 | * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory. |
45 | */ |
46 | |
47 | /// If has not read source blocks. |
48 | if (!impl) |
49 | { |
50 | while (Block block = children.back()->read()) |
51 | { |
52 | /// If there were only const columns in sort description, then there is no need to sort. |
53 | /// Return the blocks as is. |
54 | if (description.empty()) |
55 | return block; |
56 | |
57 | removeConstantsFromBlock(block); |
58 | |
59 | blocks.push_back(block); |
60 | sum_rows_in_blocks += block.rows(); |
61 | sum_bytes_in_blocks += block.allocatedBytes(); |
62 | |
63 | /** If significant amount of data was accumulated, perform preliminary merging step. |
64 | */ |
65 | if (blocks.size() > 1 |
66 | && limit |
67 | && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. |
68 | && remerge_is_useful |
69 | && max_bytes_before_remerge |
70 | && sum_bytes_in_blocks > max_bytes_before_remerge) |
71 | { |
72 | remerge(); |
73 | } |
74 | |
75 | /** If too many of them and if external sorting is enabled, |
76 | * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. |
77 | * NOTE. It's possible to check free space in filesystem. |
78 | */ |
79 | if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) |
80 | { |
81 | if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) |
82 | throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); |
83 | |
84 | temporary_files.emplace_back(createTemporaryFile(tmp_path)); |
85 | const std::string & path = temporary_files.back()->path(); |
86 | MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit); |
87 | |
88 | LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); |
89 | ProfileEvents::increment(ProfileEvents::ExternalSortWritePart); |
90 | TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage. |
91 | LOG_INFO(log, "Done writing part of data into temporary file " + path); |
92 | |
93 | blocks.clear(); |
94 | sum_bytes_in_blocks = 0; |
95 | sum_rows_in_blocks = 0; |
96 | } |
97 | } |
98 | |
99 | if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled()) |
100 | return Block(); |
101 | |
102 | if (temporary_files.empty()) |
103 | { |
104 | impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit); |
105 | } |
106 | else |
107 | { |
108 | /// If there was temporary files. |
109 | ProfileEvents::increment(ProfileEvents::ExternalSortMerge); |
110 | |
111 | LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge." ); |
112 | |
113 | /// Create sorted streams to merge. |
114 | for (const auto & file : temporary_files) |
115 | { |
116 | temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants)); |
117 | inputs_to_merge.emplace_back(temporary_inputs.back()->block_in); |
118 | } |
119 | |
120 | /// Rest of blocks in memory. |
121 | if (!blocks.empty()) |
122 | inputs_to_merge.emplace_back(std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit)); |
123 | |
124 | /// Will merge that sorted streams. |
125 | impl = std::make_unique<MergingSortedBlockInputStream>(inputs_to_merge, description, max_merged_block_size, limit); |
126 | } |
127 | } |
128 | |
129 | Block res = impl->read(); |
130 | if (res) |
131 | enrichBlockWithConstants(res, header); |
132 | return res; |
133 | } |
134 | |
135 | |
136 | MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( |
137 | Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_) |
138 | : blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) |
139 | { |
140 | Blocks nonempty_blocks; |
141 | for (const auto & block : blocks) |
142 | { |
143 | if (block.rows() == 0) |
144 | continue; |
145 | |
146 | nonempty_blocks.push_back(block); |
147 | cursors.emplace_back(block, description); |
148 | has_collation |= cursors.back().has_collation; |
149 | } |
150 | |
151 | blocks.swap(nonempty_blocks); |
152 | |
153 | if (!has_collation) |
154 | queue_without_collation = SortingHeap<SortCursor>(cursors); |
155 | else |
156 | queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors); |
157 | } |
158 | |
159 | |
160 | Block MergeSortingBlocksBlockInputStream::readImpl() |
161 | { |
162 | if (blocks.empty()) |
163 | return Block(); |
164 | |
165 | if (blocks.size() == 1) |
166 | { |
167 | Block res = blocks[0]; |
168 | blocks.clear(); |
169 | return res; |
170 | } |
171 | |
172 | return !has_collation |
173 | ? mergeImpl(queue_without_collation) |
174 | : mergeImpl(queue_with_collation); |
175 | } |
176 | |
177 | |
178 | template <typename TSortingHeap> |
179 | Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) |
180 | { |
181 | size_t num_columns = header.columns(); |
182 | |
183 | MutableColumns merged_columns = header.cloneEmptyColumns(); |
184 | /// TODO: reserve (in each column) |
185 | |
186 | /// Take rows from queue in right order and push to 'merged'. |
187 | size_t merged_rows = 0; |
188 | while (queue.isValid()) |
189 | { |
190 | auto current = queue.current(); |
191 | |
192 | /// Append a row from queue. |
193 | for (size_t i = 0; i < num_columns; ++i) |
194 | merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); |
195 | |
196 | ++total_merged_rows; |
197 | ++merged_rows; |
198 | |
199 | /// We don't need more rows because of limit has reached. |
200 | if (limit && total_merged_rows == limit) |
201 | { |
202 | blocks.clear(); |
203 | break; |
204 | } |
205 | |
206 | queue.next(); |
207 | |
208 | /// It's enough for current output block but we will continue. |
209 | if (merged_rows == max_merged_block_size) |
210 | break; |
211 | } |
212 | |
213 | if (merged_rows == 0) |
214 | return {}; |
215 | |
216 | return header.cloneWithColumns(std::move(merged_columns)); |
217 | } |
218 | |
219 | |
220 | void MergeSortingBlockInputStream::remerge() |
221 | { |
222 | LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption" ); |
223 | |
224 | /// NOTE Maybe concat all blocks and partial sort will be faster than merge? |
225 | MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit); |
226 | |
227 | Blocks new_blocks; |
228 | size_t new_sum_rows_in_blocks = 0; |
229 | size_t new_sum_bytes_in_blocks = 0; |
230 | |
231 | merger.readPrefix(); |
232 | while (Block block = merger.read()) |
233 | { |
234 | new_sum_rows_in_blocks += block.rows(); |
235 | new_sum_bytes_in_blocks += block.allocatedBytes(); |
236 | new_blocks.emplace_back(std::move(block)); |
237 | } |
238 | merger.readSuffix(); |
239 | |
240 | LOG_DEBUG(log, "Memory usage is lowered from " |
241 | << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to " |
242 | << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks)); |
243 | |
244 | /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess. |
245 | if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks) |
246 | remerge_is_useful = false; |
247 | |
248 | blocks = std::move(new_blocks); |
249 | sum_rows_in_blocks = new_sum_rows_in_blocks; |
250 | sum_bytes_in_blocks = new_sum_bytes_in_blocks; |
251 | } |
252 | } |
253 | |