1#include <Poco/Version.h>
2#include <Processors/Transforms/MergeSortingTransform.h>
3#include <Processors/IAccumulatingTransform.h>
4#include <Processors/Transforms/MergingSortedTransform.h>
5#include <Common/formatReadable.h>
6#include <Common/ProfileEvents.h>
7#include <common/config_common.h>
8#include <IO/WriteBufferFromFile.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <DataStreams/NativeBlockInputStream.h>
11#include <DataStreams/NativeBlockOutputStream.h>
12
13
14namespace ProfileEvents
15{
16 extern const Event ExternalSortWritePart;
17 extern const Event ExternalSortMerge;
18}
19
20
21namespace DB
22{
23
24class BufferingToFileTransform : public IAccumulatingTransform
25{
26public:
27 BufferingToFileTransform(const Block & header, Logger * log_, std::string path_)
28 : IAccumulatingTransform(header, header), log(log_)
29 , path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out)
30 , out_stream(std::make_shared<NativeBlockOutputStream>(compressed_buf_out, 0, header))
31 {
32 LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
33 ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
34 out_stream->writePrefix();
35 }
36
37 String getName() const override { return "BufferingToFileTransform"; }
38
39 void consume(Chunk chunk) override
40 {
41 out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
42 }
43
44 Chunk generate() override
45 {
46 if (out_stream)
47 {
48 out_stream->writeSuffix();
49 compressed_buf_out.next();
50 file_buf_out.next();
51 LOG_INFO(log, "Done writing part of data into temporary file " + path);
52
53 out_stream.reset();
54
55 file_in = std::make_unique<ReadBufferFromFile>(path);
56 compressed_in = std::make_unique<CompressedReadBuffer>(*file_in);
57 block_in = std::make_shared<NativeBlockInputStream>(*compressed_in, getOutputPort().getHeader(), 0);
58 }
59
60 if (!block_in)
61 return {};
62
63 auto block = block_in->read();
64 if (!block)
65 {
66 block_in->readSuffix();
67 block_in.reset();
68 return {};
69 }
70
71 UInt64 num_rows = block.rows();
72 return Chunk(block.getColumns(), num_rows);
73 }
74
75private:
76 Logger * log;
77 std::string path;
78 WriteBufferFromFile file_buf_out;
79 CompressedWriteBuffer compressed_buf_out;
80 BlockOutputStreamPtr out_stream;
81
82 std::unique_ptr<ReadBufferFromFile> file_in;
83 std::unique_ptr<CompressedReadBuffer> compressed_in;
84 BlockInputStreamPtr block_in;
85};
86
87MergeSortingTransform::MergeSortingTransform(
88 const Block & header,
89 const SortDescription & description_,
90 size_t max_merged_block_size_, UInt64 limit_,
91 size_t max_bytes_before_remerge_,
92 size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
93 size_t min_free_disk_space_)
94 : SortingTransform(header, description_, max_merged_block_size_, limit_)
95 , max_bytes_before_remerge(max_bytes_before_remerge_)
96 , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
97 , min_free_disk_space(min_free_disk_space_) {}
98
99Processors MergeSortingTransform::expandPipeline()
100{
101 if (processors.size() > 1)
102 {
103 /// Add external_merging_sorted.
104 inputs.emplace_back(header_without_constants, this);
105 connect(external_merging_sorted->getOutputs().front(), inputs.back());
106 }
107
108 auto & buffer = processors.front();
109
110 static_cast<MergingSortedTransform &>(*external_merging_sorted).addInput();
111 connect(buffer->getOutputs().back(), external_merging_sorted->getInputs().back());
112
113 if (!buffer->getInputs().empty())
114 {
115 /// Serialize
116 outputs.emplace_back(header_without_constants, this);
117 connect(getOutputs().back(), buffer->getInputs().back());
118 /// Hack. Say buffer that we need data from port (otherwise it will return PortFull).
119 external_merging_sorted->getInputs().back().setNeeded();
120 }
121 else
122 /// Generate
123 static_cast<MergingSortedTransform &>(*external_merging_sorted).setHaveAllInputs();
124
125 return std::move(processors);
126}
127
128void MergeSortingTransform::consume(Chunk chunk)
129{
130 /** Algorithm:
131 * - read to memory blocks from source stream;
132 * - if too many of them and if external sorting is enabled,
133 * - merge all blocks to sorted stream and write it to temporary file;
134 * - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory.
135 */
136
137 /// If there were only const columns in sort description, then there is no need to sort.
138 /// Return the chunk as is.
139 if (description.empty())
140 {
141 generated_chunk = std::move(chunk);
142 return;
143 }
144
145 removeConstColumns(chunk);
146
147 sum_rows_in_blocks += chunk.getNumRows();
148 sum_bytes_in_blocks += chunk.allocatedBytes();
149 chunks.push_back(std::move(chunk));
150
151 /** If significant amount of data was accumulated, perform preliminary merging step.
152 */
153 if (chunks.size() > 1
154 && limit
155 && limit * 2 < sum_rows_in_blocks /// 2 is just a guess.
156 && remerge_is_useful
157 && max_bytes_before_remerge
158 && sum_bytes_in_blocks > max_bytes_before_remerge)
159 {
160 remerge();
161 }
162
163 /** If too many of them and if external sorting is enabled,
164 * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
165 * NOTE. It's possible to check free space in filesystem.
166 */
167 if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
168 {
169 if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
170 throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
171
172 temporary_files.emplace_back(createTemporaryFile(tmp_path));
173 const std::string & path = temporary_files.back()->path();
174 merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
175 auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
176
177 processors.emplace_back(current_processor);
178
179 if (!external_merging_sorted)
180 {
181 bool quiet = false;
182 bool have_all_inputs = false;
183
184 external_merging_sorted = std::make_shared<MergingSortedTransform>(
185 header_without_constants,
186 0,
187 description,
188 max_merged_block_size,
189 limit,
190 quiet,
191 have_all_inputs);
192
193 processors.emplace_back(external_merging_sorted);
194 }
195
196 stage = Stage::Serialize;
197 sum_bytes_in_blocks = 0;
198 sum_rows_in_blocks = 0;
199 }
200}
201
202void MergeSortingTransform::serialize()
203{
204 current_chunk = merge_sorter->read();
205 if (!current_chunk)
206 merge_sorter.reset();
207}
208
209void MergeSortingTransform::generate()
210{
211 if (!generated_prefix)
212 {
213 if (temporary_files.empty())
214 merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
215 else
216 {
217 ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
218 LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
219
220 if (!chunks.empty())
221 processors.emplace_back(std::make_shared<MergeSorterSource>(
222 header_without_constants, std::move(chunks), description, max_merged_block_size, limit));
223 }
224
225 generated_prefix = true;
226 }
227
228 if (merge_sorter)
229 {
230 generated_chunk = merge_sorter->read();
231 if (!generated_chunk)
232 merge_sorter.reset();
233 else
234 enrichChunkWithConstants(generated_chunk);
235 }
236}
237
238void MergeSortingTransform::remerge()
239{
240 LOG_DEBUG(log, "Re-merging intermediate ORDER BY data (" << chunks.size()
241 << " blocks with " << sum_rows_in_blocks << " rows) to save memory consumption");
242
243 /// NOTE Maybe concat all blocks and partial sort will be faster than merge?
244 MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit);
245
246 Chunks new_chunks;
247 size_t new_sum_rows_in_blocks = 0;
248 size_t new_sum_bytes_in_blocks = 0;
249
250 while (auto chunk = remerge_sorter.read())
251 {
252 new_sum_rows_in_blocks += chunk.getNumRows();
253 new_sum_bytes_in_blocks += chunk.allocatedBytes();
254 new_chunks.emplace_back(std::move(chunk));
255 }
256
257 LOG_DEBUG(log, "Memory usage is lowered from "
258 << formatReadableSizeWithBinarySuffix(sum_bytes_in_blocks) << " to "
259 << formatReadableSizeWithBinarySuffix(new_sum_bytes_in_blocks));
260
261 /// If the memory consumption was not lowered enough - we will not perform remerge anymore. 2 is a guess.
262 if (new_sum_bytes_in_blocks * 2 > sum_bytes_in_blocks)
263 remerge_is_useful = false;
264
265 chunks = std::move(new_chunks);
266 sum_rows_in_blocks = new_sum_rows_in_blocks;
267 sum_bytes_in_blocks = new_sum_bytes_in_blocks;
268}
269
270}
271