1#include <Processors/Transforms/MergingSortedTransform.h>
2#include <DataStreams/ColumnGathererStream.h>
3#include <IO/WriteBuffer.h>
4#include <DataStreams/materializeBlock.h>
5
6namespace DB
7{
8
9MergingSortedTransform::MergingSortedTransform(
10 const Block & header,
11 size_t num_inputs,
12 const SortDescription & description_,
13 size_t max_block_size_,
14 UInt64 limit_,
15 bool quiet_,
16 bool have_all_inputs_)
17 : IProcessor(InputPorts(num_inputs, header), {materializeBlock(header)})
18 , description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
19 , have_all_inputs(have_all_inputs_)
20 , merged_data(header), source_chunks(num_inputs), cursors(num_inputs)
21{
22 auto & sample = outputs.front().getHeader();
23 /// Replace column names in description to positions.
24 for (auto & column_description : description)
25 {
26 has_collation |= column_description.collator != nullptr;
27 if (!column_description.column_name.empty())
28 {
29 column_description.column_number = sample.getPositionByName(column_description.column_name);
30 column_description.column_name.clear();
31 }
32 }
33}
34
35void MergingSortedTransform::addInput()
36{
37 if (have_all_inputs)
38 throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
39
40 inputs.emplace_back(outputs.front().getHeader(), this);
41 source_chunks.emplace_back();
42 cursors.emplace_back();
43}
44
45void MergingSortedTransform::setHaveAllInputs()
46{
47 if (have_all_inputs)
48 throw Exception("MergingSortedTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
49
50 have_all_inputs = true;
51}
52
53IProcessor::Status MergingSortedTransform::prepare()
54{
55 if (!have_all_inputs)
56 return Status::NeedData;
57
58 auto & output = outputs.front();
59
60 /// Special case for no inputs.
61 if (inputs.empty())
62 {
63 output.finish();
64 return Status::Finished;
65 }
66
67 /// Check can output.
68
69 if (output.isFinished())
70 {
71 for (auto & in : inputs)
72 in.close();
73
74 return Status::Finished;
75 }
76
77 if (!output.isNeeded())
78 {
79 for (auto & in : inputs)
80 in.setNotNeeded();
81
82 return Status::PortFull;
83 }
84
85 if (output.hasData())
86 return Status::PortFull;
87
88 /// Special case for single input.
89 if (inputs.size() == 1)
90 {
91 auto & input = inputs.front();
92 if (input.isFinished())
93 {
94 output.finish();
95 return Status::Finished;
96 }
97
98 input.setNeeded();
99 if (input.hasData())
100 output.push(input.pull());
101
102 return Status::NeedData;
103 }
104
105 /// Push if has data.
106 if (merged_data.mergedRows())
107 output.push(merged_data.pull());
108
109 if (!is_initialized)
110 {
111 /// Check for inputs we need.
112 bool all_inputs_has_data = true;
113 auto it = inputs.begin();
114 for (size_t i = 0; it != inputs.end(); ++i, ++it)
115 {
116 auto & input = *it;
117 if (input.isFinished())
118 continue;
119
120 if (!cursors[i].empty())
121 {
122 input.setNotNeeded();
123 continue;
124 }
125
126 input.setNeeded();
127
128 if (!input.hasData())
129 {
130 all_inputs_has_data = false;
131 continue;
132 }
133
134 auto chunk = input.pull();
135 if (!chunk.hasRows())
136 {
137
138 if (!input.isFinished())
139 all_inputs_has_data = false;
140
141 continue;
142 }
143
144 updateCursor(std::move(chunk), i);
145 }
146
147 if (!all_inputs_has_data)
148 return Status::NeedData;
149
150 if (has_collation)
151 initQueue(queue_with_collation);
152 else
153 initQueue(queue_without_collation);
154
155 is_initialized = true;
156 return Status::Ready;
157 }
158 else
159 {
160 if (is_finished)
161 {
162 for (auto & input : inputs)
163 input.close();
164
165 outputs.front().finish();
166
167 return Status::Finished;
168 }
169
170 if (need_data)
171 {
172
173 auto & input = *std::next(inputs.begin(), next_input_to_read);
174 if (!input.isFinished())
175 {
176 input.setNeeded();
177
178 if (!input.hasData())
179 return Status::NeedData;
180
181 auto chunk = input.pull();
182 if (!chunk.hasRows() && !input.isFinished())
183 return Status::NeedData;
184
185 updateCursor(std::move(chunk), next_input_to_read);
186 pushToQueue(next_input_to_read);
187 }
188
189 need_data = false;
190 }
191
192 return Status::Ready;
193 }
194}
195
196void MergingSortedTransform::work()
197{
198 if (has_collation)
199 merge(queue_with_collation);
200 else
201 merge(queue_without_collation);
202}
203
204template <typename TSortCursor>
205void MergingSortedTransform::merge(std::priority_queue<TSortCursor> & queue)
206{
207 /// Returns MergeStatus which we should return if we are going to finish now.
208 auto can_read_another_row = [&, this]()
209 {
210 if (limit && merged_data.totalMergedRows() >= limit)
211 {
212 //std::cerr << "Limit reached\n";
213 is_finished = true;
214 return false;
215 }
216
217 if (merged_data.mergedRows() >= max_block_size)
218 {
219 //std::cerr << "max_block_size reached\n";
220 return false;
221 }
222
223 return true;
224 };
225
226 /// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
227 while (!queue.empty())
228 {
229 /// Shouldn't happen at first iteration, but check just in case.
230 if (!can_read_another_row())
231 return;
232
233 TSortCursor current = queue.top();
234 queue.pop();
235 bool first_iteration = true;
236
237 while (true)
238 {
239 if (!first_iteration && !can_read_another_row())
240 {
241 queue.push(current);
242 return;
243 }
244 first_iteration = false;
245
246 /** And what if the block is totally less or equal than the rest for the current cursor?
247 * Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
248 */
249 if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
250 {
251 //std::cerr << "current block is totally less or equals\n";
252
253 /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
254 if (merged_data.mergedRows() != 0)
255 {
256 //std::cerr << "merged rows is non-zero\n";
257 queue.push(current);
258 return;
259 }
260
261 /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
262 size_t source_num = current.impl->order;
263 insertFromChunk(source_num);
264 return;
265 }
266
267 //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
268 //std::cerr << "Inserting row\n";
269 merged_data.insertRow(current->all_columns, current->pos);
270
271 if (out_row_sources_buf)
272 {
273 /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
274 RowSourcePart row_source(current.impl->order);
275 out_row_sources_buf->write(row_source.data);
276 }
277
278 if (current->isLast())
279 {
280 need_data = true;
281 next_input_to_read = current.impl->order;
282
283 if (limit && merged_data.totalMergedRows() >= limit)
284 is_finished = true;
285
286 return;
287 }
288
289 //std::cerr << "moving to next row\n";
290 current->next();
291
292 if (!queue.empty() && current.greater(queue.top()))
293 {
294 //std::cerr << "next row is not least, pushing back to queue\n";
295 queue.push(current);
296 break;
297 }
298 }
299 }
300 is_finished = true;
301}
302
303void MergingSortedTransform::insertFromChunk(size_t source_num)
304{
305 if (source_num >= cursors.size())
306 throw Exception("Logical error in MergingSortedTrandform", ErrorCodes::LOGICAL_ERROR);
307
308 //std::cerr << "copied columns\n";
309
310 auto num_rows = source_chunks[source_num]->getNumRows();
311
312 UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
313 if (limit && total_merged_rows_after_insertion > limit)
314 {
315 num_rows = total_merged_rows_after_insertion - limit;
316 merged_data.insertFromChunk(std::move(*source_chunks[source_num]), num_rows);
317 is_finished = true;
318 }
319 else
320 {
321 merged_data.insertFromChunk(std::move(*source_chunks[source_num]), 0);
322 need_data = true;
323 next_input_to_read = source_num;
324 }
325
326 if (out_row_sources_buf)
327 {
328 RowSourcePart row_source(source_num);
329 for (size_t i = 0; i < num_rows; ++i)
330 out_row_sources_buf->write(row_source.data);
331 }
332}
333
334
335}
336