1#include <queue>
2#include <iomanip>
3#include <sstream>
4
5#include <DataStreams/MergingSortedBlockInputStream.h>
6#include <DataStreams/ColumnGathererStream.h>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int LOGICAL_ERROR;
15 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
16}
17
18
19MergingSortedBlockInputStream::MergingSortedBlockInputStream(
20 const BlockInputStreams & inputs_, const SortDescription & description_,
21 size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_, bool average_block_sizes_)
22 : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
23 , average_block_sizes(average_block_sizes_), source_blocks(inputs_.size())
24 , cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
25{
26 children.insert(children.end(), inputs_.begin(), inputs_.end());
27 header = children.at(0)->getHeader();
28 num_columns = header.columns();
29}
30
31void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
32{
33 /// Read the first blocks, initialize the queue.
34 if (first)
35 {
36 first = false;
37
38 for (size_t i = 0; i < source_blocks.size(); ++i)
39 {
40 SharedBlockPtr & shared_block_ptr = source_blocks[i];
41
42 if (shared_block_ptr.get())
43 continue;
44
45 shared_block_ptr = new detail::SharedBlock(children[i]->read());
46
47 const size_t rows = shared_block_ptr->rows();
48
49 if (rows == 0)
50 continue;
51
52 if (expected_block_size < rows)
53 expected_block_size = std::min(rows, max_block_size);
54
55 cursors[i] = SortCursorImpl(*shared_block_ptr, description, i);
56 shared_block_ptr->all_columns = cursors[i].all_columns;
57 shared_block_ptr->sort_columns = cursors[i].sort_columns;
58 has_collation |= cursors[i].has_collation;
59 }
60
61 if (has_collation)
62 initQueue(queue_with_collation);
63 else
64 initQueue(queue_without_collation);
65 }
66
67 /// Let's check that all source blocks have the same structure.
68 for (const SharedBlockPtr & shared_block_ptr : source_blocks)
69 {
70 if (!*shared_block_ptr)
71 continue;
72
73 assertBlocksHaveEqualStructure(*shared_block_ptr, header, getName());
74 }
75
76 merged_columns.resize(num_columns);
77 for (size_t i = 0; i < num_columns; ++i)
78 {
79 merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty();
80 merged_columns[i]->reserve(expected_block_size);
81 }
82}
83
84
85template <typename TSortCursor>
86void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
87{
88 for (size_t i = 0; i < cursors.size(); ++i)
89 if (!cursors[i].empty())
90 queue.push(TSortCursor(&cursors[i]));
91}
92
93
94Block MergingSortedBlockInputStream::readImpl()
95{
96 if (finished)
97 return {};
98
99 if (children.size() == 1)
100 return children[0]->read();
101
102 MutableColumns merged_columns;
103
104 init(merged_columns);
105 if (merged_columns.empty())
106 return {};
107
108 if (has_collation)
109 merge(merged_columns, queue_with_collation);
110 else
111 merge(merged_columns, queue_without_collation);
112
113 return header.cloneWithColumns(std::move(merged_columns));
114}
115
116
117template <typename TSortCursor>
118void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
119{
120 size_t order = current->order;
121 size_t size = cursors.size();
122
123 if (order >= size || &cursors[order] != current.impl)
124 throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
125
126 while (true)
127 {
128 source_blocks[order] = new detail::SharedBlock(children[order]->read());
129
130 if (!*source_blocks[order])
131 break;
132
133 if (source_blocks[order]->rows())
134 {
135 cursors[order].reset(*source_blocks[order]);
136 queue.push(TSortCursor(&cursors[order]));
137 source_blocks[order]->all_columns = cursors[order].all_columns;
138 source_blocks[order]->sort_columns = cursors[order].sort_columns;
139 break;
140 }
141 }
142}
143
144
145bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const
146{
147 if (!count_average)
148 return sum_rows_count == max_block_size;
149
150 if (sum_rows_count == 0)
151 return false;
152
153 size_t average = sum_blocks_granularity / sum_rows_count;
154 return sum_rows_count >= average;
155}
156
157template
158void MergingSortedBlockInputStream::fetchNextBlock<SortCursor>(const SortCursor & current, std::priority_queue<SortCursor> & queue);
159
160template
161void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(const SortCursorWithCollation & current, std::priority_queue<SortCursorWithCollation> & queue);
162
163
164template <typename TSortCursor>
165void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue)
166{
167 size_t merged_rows = 0;
168
169 MergeStopCondition stop_condition(average_block_sizes, max_block_size);
170 /** Increase row counters.
171 * Return true if it's time to finish generating the current data block.
172 */
173 auto count_row_and_check_limit = [&, this](size_t current_granularity)
174 {
175 ++total_merged_rows;
176 if (limit && total_merged_rows == limit)
177 {
178 // std::cerr << "Limit reached\n";
179 cancel(false);
180 finished = true;
181 return true;
182 }
183
184 ++merged_rows;
185 stop_condition.addRowWithGranularity(current_granularity);
186 return stop_condition.checkStop();
187 };
188
189 /// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size`
190 while (!queue.empty())
191 {
192 TSortCursor current = queue.top();
193 size_t current_block_granularity = current->rows;
194 queue.pop();
195
196 while (true)
197 {
198 /** And what if the block is totally less or equal than the rest for the current cursor?
199 * Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
200 */
201 if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
202 {
203 // std::cerr << "current block is totally less or equals\n";
204
205 /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
206 if (merged_rows != 0)
207 {
208 //std::cerr << "merged rows is non-zero\n";
209 queue.push(current);
210 return;
211 }
212
213 /// Actually, current->order stores source number (i.e. cursors[current->order] == current)
214 size_t source_num = current->order;
215
216 if (source_num >= cursors.size())
217 throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
218
219 for (size_t i = 0; i < num_columns; ++i)
220 merged_columns[i] = (*std::move(source_blocks[source_num]->getByPosition(i).column)).mutate();
221
222 // std::cerr << "copied columns\n";
223
224 merged_rows = merged_columns.at(0)->size();
225
226 /// Limit output
227 if (limit && total_merged_rows + merged_rows > limit)
228 {
229 merged_rows = limit - total_merged_rows;
230 for (size_t i = 0; i < num_columns; ++i)
231 {
232 auto & column = merged_columns[i];
233 column = (*column->cut(0, merged_rows)).mutate();
234 }
235
236 cancel(false);
237 finished = true;
238 }
239
240 /// Write order of rows for other columns
241 /// this data will be used in grather stream
242 if (out_row_sources_buf)
243 {
244 RowSourcePart row_source(source_num);
245 for (size_t i = 0; i < merged_rows; ++i)
246 out_row_sources_buf->write(row_source.data);
247 }
248
249 //std::cerr << "fetching next block\n";
250
251 total_merged_rows += merged_rows;
252 fetchNextBlock(current, queue);
253 return;
254 }
255
256 // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
257 // std::cerr << "Inserting row\n";
258 for (size_t i = 0; i < num_columns; ++i)
259 merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
260
261 if (out_row_sources_buf)
262 {
263 /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
264 RowSourcePart row_source(current->order);
265 out_row_sources_buf->write(row_source.data);
266 }
267
268 if (!current->isLast())
269 {
270 // std::cerr << "moving to next row\n";
271 current->next();
272
273 if (queue.empty() || !(current.greater(queue.top())))
274 {
275 if (count_row_and_check_limit(current_block_granularity))
276 {
277 // std::cerr << "pushing back to queue\n";
278 queue.push(current);
279 return;
280 }
281
282 /// Do not put the cursor back in the queue, but continue to work with the current cursor.
283 // std::cerr << "current is still on top, using current row\n";
284 continue;
285 }
286 else
287 {
288 // std::cerr << "next row is not least, pushing back to queue\n";
289 queue.push(current);
290 }
291 }
292 else
293 {
294 /// We get the next block from the corresponding source, if there is one.
295 // std::cerr << "It was last row, fetching next block\n";
296 fetchNextBlock(current, queue);
297 }
298
299 break;
300 }
301
302 if (count_row_and_check_limit(current_block_granularity))
303 return;
304 }
305
306 cancel(false);
307 finished = true;
308}
309
310
311void MergingSortedBlockInputStream::readSuffixImpl()
312{
313 if (quiet)
314 return;
315
316 const BlockStreamProfileInfo & profile_info = getProfileInfo();
317 double seconds = profile_info.total_stopwatch.elapsedSeconds();
318
319 std::stringstream message;
320 message << std::fixed << std::setprecision(2)
321 << "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
322 << " in " << seconds << " sec.";
323
324 if (seconds)
325 message << ", "
326 << profile_info.rows / seconds << " rows/sec., "
327 << profile_info.bytes / 1000000.0 / seconds << " MB/sec.";
328
329 LOG_DEBUG(log, message.str());
330}
331
332}
333