1#include <DataStreams/MergeSortingBlockInputStream.h>
2#include <DataStreams/MergingSortedBlockInputStream.h>
3#include <DataStreams/NativeBlockOutputStream.h>
4#include <DataStreams/TemporaryFileStream.h>
5#include <DataStreams/processConstants.h>
6#include <Common/formatReadable.h>
7#include <IO/WriteBufferFromFile.h>
8#include <Compression/CompressedWriteBuffer.h>
9#include <Interpreters/sortBlock.h>
10
11
12namespace ProfileEvents
13{
14 extern const Event ExternalSortWritePart;
15 extern const Event ExternalSortMerge;
16}
17
18namespace DB
19{
20
21MergeSortingBlockInputStream::MergeSortingBlockInputStream(
22 const BlockInputStreamPtr & input, SortDescription & description_,
23 size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
24 size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
25 : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
26 max_bytes_before_remerge(max_bytes_before_remerge_),
27 max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
28 min_free_disk_space(min_free_disk_space_)
29{
30 children.push_back(input);
31 header = children.at(0)->getHeader();
32 header_without_constants = header;
33 removeConstantsFromBlock(header_without_constants);
34 removeConstantsFromSortDescription(header, description);
35}
36
37
38Block MergeSortingBlockInputStream::readImpl()
39{
40 /** Algorithm:
41 * - read to memory blocks from source stream;
42 * - if too many of them and if external sorting is enabled,
43 * - merge all blocks to sorted stream and write it to temporary file;
44 * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory.
45 */
46
47 /// If has not read source blocks.
48 if (!impl)
49 {
50 while (Block block = children.back()->read())
51 {
52 /// If there were only const columns in sort description, then there is no need to sort.
53 /// Return the blocks as is.
54 if (description.empty())
55 return block;
56
57 removeConstantsFromBlock(block);
58
59 blocks.push_back(block);
60 sum_rows_in_blocks += block.rows();
61 sum_bytes_in_blocks += block.allocatedBytes();
62
63 /** If significant amount of data was accumulated, perform preliminary merging step.
64 */
65 if (blocks.size() > 1
66 && limit
67 && limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
68 && remerge_is_useful
69 && max_bytes_before_remerge
70 && sum_bytes_in_blocks > max_bytes_before_remerge)
71 {
72 remerge();
73 }
74
75 /** If too many of them and if external sorting is enabled,
76 * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
77 * NOTE. It's possible to check free space in filesystem.
78 */
79 if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
80 {
81 if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
82 throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
83
84 temporary_files.emplace_back(createTemporaryFile(tmp_path));
85 const std::string & path = temporary_files.back()->path();
86 MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
87
88 LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
89 ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
90 TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage.
91 LOG_INFO(log, "Done writing part of data into temporary file " + path);
92
93 blocks.clear();
94 sum_bytes_in_blocks = 0;
95 sum_rows_in_blocks = 0;
96 }
97 }
98
99 if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled())
100 return Block();
101
102 if (temporary_files.empty())
103 {
104 impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit);
105 }
106 else
107 {
108 /// If there was temporary files.
109 ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
110
111 LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
112
113 /// Create sorted streams to merge.
114 for (const auto & file : temporary_files)
115 {
116 temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants));
117 inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
118 }
119
120 /// Rest of blocks in memory.
121 if (!blocks.empty())
122 inputs_to_merge.emplace_back(std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit));
123
124 /// Will merge that sorted streams.
125 impl = std::make_unique<MergingSortedBlockInputStream>(inputs_to_merge, description, max_merged_block_size, limit);
126 }
127 }
128
129 Block res = impl->read();
130 if (res)
131 enrichBlockWithConstants(res, header);
132 return res;
133}
134
135
136MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
137 Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
138 : blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
139{
140 Blocks nonempty_blocks;
141 for (const auto & block : blocks)
142 {
143 if (block.rows() == 0)
144 continue;
145
146 nonempty_blocks.push_back(block);
147 cursors.emplace_back(block, description);
148 has_collation |= cursors.back().has_collation;
149 }
150
151 blocks.swap(nonempty_blocks);
152
153 if (!has_collation)
154 queue_without_collation = SortingHeap<SortCursor>(cursors);
155 else
156 queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
157}
158
159
160Block MergeSortingBlocksBlockInputStream::readImpl()
161{
162 if (blocks.empty())
163 return Block();
164
165 if (blocks.size() == 1)
166 {
167 Block res = blocks[0];
168 blocks.clear();
169 return res;
170 }
171
172 return !has_collation
173 ? mergeImpl(queue_without_collation)
174 : mergeImpl(queue_with_collation);
175}
176
177
178template <typename TSortingHeap>
179Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
180{
181 size_t num_columns = header.columns();
182
183 MutableColumns merged_columns = header.cloneEmptyColumns();
184 /// TODO: reserve (in each column)
185
186 /// Take rows from queue in right order and push to 'merged'.
187 size_t merged_rows = 0;
188 while (queue.isValid())
189 {
190 auto current = queue.current();
191
192 /// Append a row from queue.
193 for (size_t i = 0; i < num_columns; ++i)
194 merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
195
196 ++total_merged_rows;
197 ++merged_rows;
198
199 /// We don't need more rows because of limit has reached.
200 if (limit && total_merged_rows == limit)
201 {
202 blocks.clear();
203 break;
204 }
205
206 queue.next();
207
208 /// It's enough for current output block but we will continue.
209 if (merged_rows == max_merged_block_size)
210 break;
211 }
212
213 if (merged_rows == 0)
214 return {};
215
216 return header.cloneWithColumns(std::move(merged_columns));
217}
218
219
220void MergeSortingBlockInputStream::remerge()
221{
222 LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << blocks.size() << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
223
224 /// NOTE Maybe concat all blocks and partial sort will be faster than merge?
225 MergeSortingBlocksBlockInputStream merger(blocks, description, max_merged_block_size, limit);
226
227 Blocks new_blocks;
228 size_t new_sum_rows_in_blocks = 0;
229 size_t new_sum_bytes_in_blocks = 0;
230
231 merger.readPrefix();
232 while (Block block = merger.read())
233 {
234 new_sum_rows_in_blocks += block.rows();
235 new_sum_bytes_in_blocks += block.allocatedBytes();
236 new_blocks.emplace_back(std::move(block));
237 }
238 merger.readSuffix();
239
240 LOG_DEBUG(log, "Memory usage is lowered from "
241 << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to "
242 << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
243
244 /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
245 if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
246 remerge_is_useful = false;
247
248 blocks = std::move(new_blocks);
249 sum_rows_in_blocks = new_sum_rows_in_blocks;
250 sum_bytes_in_blocks = new_sum_bytes_in_blocks;
251}
252}
253