1#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
2
3#include <Processors/ISimpleTransform.h>
4#include <Interpreters/Aggregator.h>
5#include <Processors/ResizeProcessor.h>
6
7namespace DB
8{
9
10struct ChunksToMerge : public ChunkInfo
11{
12 std::unique_ptr<Chunks> chunks;
13 Int32 bucket_num = -1;
14 bool is_overflows = false;
15};
16
17GroupingAggregatedTransform::GroupingAggregatedTransform(
18 const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_)
19 : IProcessor(InputPorts(num_inputs_, header_), { Block() })
20 , num_inputs(num_inputs_)
21 , params(std::move(params_))
22 , last_bucket_number(num_inputs, -1)
23 , read_from_input(num_inputs, false)
24{
25}
26
27void GroupingAggregatedTransform::readFromAllInputs()
28{
29 auto in = inputs.begin();
30 for (size_t i = 0; i < num_inputs; ++i, ++in)
31 {
32 if (in->isFinished())
33 continue;
34
35 if (read_from_input[i])
36 continue;
37
38 in->setNeeded();
39
40 if (!in->hasData())
41 return;
42
43 auto chunk = in->pull();
44 read_from_input[i] = true;
45 addChunk(std::move(chunk), i);
46 }
47
48 read_from_all_inputs = true;
49}
50
51void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
52{
53 auto & output = outputs.front();
54
55 auto info = std::make_shared<ChunksToMerge>();
56 info->bucket_num = bucket;
57 info->is_overflows = is_overflows;
58 info->chunks = std::make_unique<Chunks>(std::move(chunks));
59
60 Chunk chunk;
61 chunk.setChunkInfo(std::move(info));
62 output.push(std::move(chunk));
63}
64
65bool GroupingAggregatedTransform::tryPushTwoLevelData()
66{
67 auto try_push_by_iter = [&](auto batch_it)
68 {
69 if (batch_it == chunks_map.end())
70 return false;
71
72 Chunks & cur_chunks = batch_it->second;
73 if (cur_chunks.empty())
74 {
75 chunks_map.erase(batch_it);
76 return false;
77 }
78
79 pushData(std::move(cur_chunks), batch_it->first, false);
80 chunks_map.erase(batch_it);
81 return true;
82 };
83
84 if (all_inputs_finished)
85 {
86 /// Chunks are sorted by bucket.
87 while (!chunks_map.empty())
88 if (try_push_by_iter(chunks_map.begin()))
89 return true;
90 }
91 else
92 {
93 for (; next_bucket_to_push < current_bucket; ++next_bucket_to_push)
94 if (try_push_by_iter(chunks_map.find(next_bucket_to_push)))
95 return true;
96 }
97
98 return false;
99}
100
101bool GroupingAggregatedTransform::tryPushSingleLevelData()
102{
103 if (single_level_chunks.empty())
104 return false;
105
106 pushData(std::move(single_level_chunks), -1, false);
107 return true;
108}
109
110bool GroupingAggregatedTransform::tryPushOverflowData()
111{
112 if (overflow_chunks.empty())
113 return false;
114
115 pushData(std::move(overflow_chunks), -1, true);
116 return true;
117}
118
119IProcessor::Status GroupingAggregatedTransform::prepare()
120{
121 /// Check can output.
122 auto & output = outputs.front();
123
124 if (output.isFinished())
125 {
126 for (auto & input : inputs)
127 input.close();
128
129 chunks_map.clear();
130 last_bucket_number.clear();
131 return Status::Finished;
132 }
133
134 /// Read first time from each input to understand if we have two-level aggregation.
135 if (!read_from_all_inputs)
136 {
137 readFromAllInputs();
138 if (!read_from_all_inputs)
139 return Status::NeedData;
140 }
141
142 /// Convert single level to two levels if have two-level input.
143 if (has_two_level && !single_level_chunks.empty())
144 return Status::Ready;
145
146 /// Check can push (to avoid data caching).
147 if (!output.canPush())
148 {
149 for (auto & input : inputs)
150 input.setNotNeeded();
151
152 return Status::PortFull;
153 }
154
155 bool pushed_to_output = false;
156
157 /// Output if has data.
158 if (has_two_level)
159 pushed_to_output = tryPushTwoLevelData();
160
161 auto need_input = [this](size_t input_num)
162 {
163 if (last_bucket_number[input_num] < current_bucket)
164 return true;
165
166 return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
167 };
168
169 /// Read next bucket if can.
170 for (; ; ++current_bucket)
171 {
172 bool finished = true;
173 bool need_data = false;
174
175 auto in = inputs.begin();
176 for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in)
177 {
178 if (in->isFinished())
179 continue;
180
181 finished = false;
182
183 if (!need_input(input_num))
184 continue;
185
186 in->setNeeded();
187
188 if (!in->hasData())
189 {
190 need_data = true;
191 continue;
192 }
193
194 auto chunk = in->pull();
195 addChunk(std::move(chunk), input_num);
196
197 if (has_two_level && !single_level_chunks.empty())
198 return Status::Ready;
199
200 if (!in->isFinished() && need_input(input_num))
201 need_data = true;
202 }
203
204 if (finished)
205 {
206 all_inputs_finished = true;
207 break;
208 }
209
210 if (need_data)
211 return Status::NeedData;
212 }
213
214 if (pushed_to_output)
215 return Status::PortFull;
216
217 if (has_two_level)
218 {
219 if (tryPushTwoLevelData())
220 return Status::PortFull;
221
222 /// Sanity check. If new bucket was read, we should be able to push it.
223 if (!all_inputs_finished)
224 throw Exception("GroupingAggregatedTransform has read new two-level bucket, but couldn't push it.",
225 ErrorCodes::LOGICAL_ERROR);
226 }
227 else
228 {
229 if (!all_inputs_finished)
230 throw Exception("GroupingAggregatedTransform should have read all chunks for single level aggregation, "
231 "but not all of the inputs are finished.", ErrorCodes::LOGICAL_ERROR);
232
233 if (tryPushSingleLevelData())
234 return Status::PortFull;
235 }
236
237 /// If we haven't pushed to output, then all data was read. Push overflows if have.
238 if (tryPushOverflowData())
239 return Status::PortFull;
240
241 output.finish();
242 return Status::Finished;
243}
244
245void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input)
246{
247 auto & info = chunk.getChunkInfo();
248 if (!info)
249 throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
250
251 auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
252 if (!agg_info)
253 throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
254
255 Int32 bucket = agg_info->bucket_num;
256 bool is_overflows = agg_info->is_overflows;
257
258 if (is_overflows)
259 overflow_chunks.emplace_back(std::move(chunk));
260 else if (bucket < 0)
261 single_level_chunks.emplace_back(std::move(chunk));
262 else
263 {
264 chunks_map[bucket].emplace_back(std::move(chunk));
265 has_two_level = true;
266 last_bucket_number[input] = bucket;
267 }
268}
269
270void GroupingAggregatedTransform::work()
271{
272 if (!single_level_chunks.empty())
273 {
274 auto & header = getOutputs().front().getHeader();
275 auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns());
276 single_level_chunks.pop_back();
277 auto blocks = params->aggregator.convertBlockToTwoLevel(block);
278
279 for (auto & cur_block : blocks)
280 {
281 Int32 bucket = cur_block.info.bucket_num;
282 chunks_map[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows()));
283 }
284 }
285}
286
287
288MergingAggregatedBucketTransform::MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params_)
289 : ISimpleTransform({}, params_->getHeader(), false), params(std::move(params_))
290{
291 setInputNotNeededAfterRead(true);
292}
293
294void MergingAggregatedBucketTransform::transform(Chunk & chunk)
295{
296 auto & info = chunk.getChunkInfo();
297 auto * chunks_to_merge = typeid_cast<const ChunksToMerge *>(info.get());
298
299 if (!chunks_to_merge)
300 throw Exception("MergingAggregatedSimpleTransform chunk must have ChunkInfo with type ChunksToMerge.",
301 ErrorCodes::LOGICAL_ERROR);
302
303 auto header = params->aggregator.getHeader(false);
304
305 BlocksList blocks_list;
306 for (auto & cur_chunk : *chunks_to_merge->chunks)
307 {
308 auto & cur_info = cur_chunk.getChunkInfo();
309 if (!cur_info)
310 throw Exception("Chunk info was not set for chunk in MergingAggregatedBucketTransform.",
311 ErrorCodes::LOGICAL_ERROR);
312
313 auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(cur_info.get());
314 if (!agg_info)
315 throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedBucketTransform.",
316 ErrorCodes::LOGICAL_ERROR);
317
318 Block block = header.cloneWithColumns(cur_chunk.detachColumns());
319 block.info.is_overflows = agg_info->is_overflows;
320 block.info.bucket_num = agg_info->bucket_num;
321
322 blocks_list.emplace_back(std::move(block));
323 }
324
325 auto res_info = std::make_shared<AggregatedChunkInfo>();
326 res_info->is_overflows = chunks_to_merge->is_overflows;
327 res_info->bucket_num = chunks_to_merge->bucket_num;
328 chunk.setChunkInfo(std::move(res_info));
329
330 auto block = params->aggregator.mergeBlocks(blocks_list, params->final);
331 size_t num_rows = block.rows();
332 chunk.setColumns(block.getColumns(), num_rows);
333}
334
335
336SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, AggregatingTransformParamsPtr params_)
337 : IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()})
338 , num_inputs(num_inputs_)
339 , params(std::move(params_))
340 , last_bucket_number(num_inputs, -1)
341 , is_input_finished(num_inputs, false)
342{
343}
344
345bool SortingAggregatedTransform::tryPushChunk()
346{
347 auto & output = outputs.front();
348
349 if (chunks.empty())
350 return false;
351
352 /// Chunk with min current bucket.
353 auto it = chunks.begin();
354 auto cur_bucket = it->first;
355
356 /// Check that can push it
357 for (size_t input = 0; input < num_inputs; ++input)
358 if (!is_input_finished[input] && last_bucket_number[input] < cur_bucket)
359 return false;
360
361 output.push(std::move(it->second));
362 chunks.erase(it);
363 return true;
364}
365
366void SortingAggregatedTransform::addChunk(Chunk chunk, size_t from_input)
367{
368 auto & info = chunk.getChunkInfo();
369 if (!info)
370 throw Exception("Chunk info was not set for chunk in SortingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
371
372 auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
373 if (!agg_info)
374 throw Exception("Chunk should have AggregatedChunkInfo in SortingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
375
376 Int32 bucket = agg_info->bucket_num;
377 bool is_overflows = agg_info->is_overflows;
378
379 if (is_overflows)
380 overflow_chunk = std::move(chunk);
381 else
382 {
383 if (chunks[bucket])
384 throw Exception("SortingAggregatedTransform already got bucket with number " + toString(bucket),
385 ErrorCodes::LOGICAL_ERROR);
386
387 chunks[bucket] = std::move(chunk);
388 last_bucket_number[from_input] = bucket;
389 }
390}
391
392IProcessor::Status SortingAggregatedTransform::prepare()
393{
394 /// Check can output.
395 auto & output = outputs.front();
396
397 if (output.isFinished())
398 {
399 for (auto & input : inputs)
400 input.close();
401
402 chunks.clear();
403 last_bucket_number.clear();
404 return Status::Finished;
405 }
406
407 /// Check can push (to avoid data caching).
408 if (!output.canPush())
409 {
410 for (auto & input : inputs)
411 input.setNotNeeded();
412
413 return Status::PortFull;
414 }
415
416 /// Push if have min version.
417 bool pushed_to_output = tryPushChunk();
418
419 bool need_data = false;
420 bool all_finished = true;
421
422 /// Try read anything.
423 auto in = inputs.begin();
424 for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in)
425 {
426 if (in->isFinished())
427 {
428 is_input_finished[input_num] = true;
429 continue;
430 }
431
432 //all_finished = false;
433
434 in->setNeeded();
435
436 if (!in->hasData())
437 {
438 need_data = true;
439 all_finished = false;
440 continue;
441 }
442
443 auto chunk = in->pull();
444 addChunk(std::move(chunk), input_num);
445
446 if (in->isFinished())
447 {
448 is_input_finished[input_num] = true;
449 }
450 else
451 {
452 /// If chunk was pulled, then we need data from this port.
453 need_data = true;
454 all_finished = false;
455 }
456 }
457
458 if (pushed_to_output)
459 return Status::PortFull;
460
461 if (tryPushChunk())
462 return Status::PortFull;
463
464 if (need_data)
465 return Status::NeedData;
466
467 if (!all_finished)
468 throw Exception("SortingAggregatedTransform has read bucket, but couldn't push it.",
469 ErrorCodes::LOGICAL_ERROR);
470
471 if (overflow_chunk)
472 {
473 output.push(std::move(overflow_chunk));
474 return Status::PortFull;
475 }
476
477 output.finish();
478 return Status::Finished;
479}
480
481
482Processors createMergingAggregatedMemoryEfficientPipe(
483 Block header,
484 AggregatingTransformParamsPtr params,
485 size_t num_inputs,
486 size_t num_merging_processors)
487{
488 Processors processors;
489 processors.reserve(num_merging_processors + 2);
490
491 auto grouping = std::make_shared<GroupingAggregatedTransform>(header, num_inputs, params);
492 processors.emplace_back(std::move(grouping));
493
494 if (num_merging_processors <= 1)
495 {
496 /// --> GroupingAggregated --> MergingAggregatedBucket -->
497 auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
498 connect(processors.back()->getOutputs().front(), transform->getInputPort());
499
500 processors.emplace_back(std::move(transform));
501 return processors;
502 }
503
504 /// --> --> MergingAggregatedBucket -->
505 /// --> GroupingAggregated --> ResizeProcessor --> MergingAggregatedBucket --> SortingAggregated -->
506 /// --> --> MergingAggregatedBucket -->
507
508 auto resize = std::make_shared<ResizeProcessor>(Block(), 1, num_merging_processors);
509 connect(processors.back()->getOutputs().front(), resize->getInputs().front());
510 processors.emplace_back(std::move(resize));
511
512 auto sorting = std::make_shared<SortingAggregatedTransform>(num_merging_processors, params);
513 auto out = processors.back()->getOutputs().begin();
514 auto in = sorting->getInputs().begin();
515
516 for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out)
517 {
518 auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
519 transform->setStream(i);
520 connect(*out, transform->getInputPort());
521 connect(transform->getOutputPort(), *in);
522 processors.emplace_back(std::move(transform));
523 }
524
525 processors.emplace_back(std::move(sorting));
526 return processors;
527}
528
529}
530