1 | #include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h> |
2 | |
3 | #include <Processors/ISimpleTransform.h> |
4 | #include <Interpreters/Aggregator.h> |
5 | #include <Processors/ResizeProcessor.h> |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | struct ChunksToMerge : public ChunkInfo |
11 | { |
12 | std::unique_ptr<Chunks> chunks; |
13 | Int32 bucket_num = -1; |
14 | bool is_overflows = false; |
15 | }; |
16 | |
17 | GroupingAggregatedTransform::GroupingAggregatedTransform( |
18 | const Block & , size_t num_inputs_, AggregatingTransformParamsPtr params_) |
19 | : IProcessor(InputPorts(num_inputs_, header_), { Block() }) |
20 | , num_inputs(num_inputs_) |
21 | , params(std::move(params_)) |
22 | , last_bucket_number(num_inputs, -1) |
23 | , read_from_input(num_inputs, false) |
24 | { |
25 | } |
26 | |
27 | void GroupingAggregatedTransform::readFromAllInputs() |
28 | { |
29 | auto in = inputs.begin(); |
30 | for (size_t i = 0; i < num_inputs; ++i, ++in) |
31 | { |
32 | if (in->isFinished()) |
33 | continue; |
34 | |
35 | if (read_from_input[i]) |
36 | continue; |
37 | |
38 | in->setNeeded(); |
39 | |
40 | if (!in->hasData()) |
41 | return; |
42 | |
43 | auto chunk = in->pull(); |
44 | read_from_input[i] = true; |
45 | addChunk(std::move(chunk), i); |
46 | } |
47 | |
48 | read_from_all_inputs = true; |
49 | } |
50 | |
51 | void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows) |
52 | { |
53 | auto & output = outputs.front(); |
54 | |
55 | auto info = std::make_shared<ChunksToMerge>(); |
56 | info->bucket_num = bucket; |
57 | info->is_overflows = is_overflows; |
58 | info->chunks = std::make_unique<Chunks>(std::move(chunks)); |
59 | |
60 | Chunk chunk; |
61 | chunk.setChunkInfo(std::move(info)); |
62 | output.push(std::move(chunk)); |
63 | } |
64 | |
65 | bool GroupingAggregatedTransform::tryPushTwoLevelData() |
66 | { |
67 | auto try_push_by_iter = [&](auto batch_it) |
68 | { |
69 | if (batch_it == chunks_map.end()) |
70 | return false; |
71 | |
72 | Chunks & cur_chunks = batch_it->second; |
73 | if (cur_chunks.empty()) |
74 | { |
75 | chunks_map.erase(batch_it); |
76 | return false; |
77 | } |
78 | |
79 | pushData(std::move(cur_chunks), batch_it->first, false); |
80 | chunks_map.erase(batch_it); |
81 | return true; |
82 | }; |
83 | |
84 | if (all_inputs_finished) |
85 | { |
86 | /// Chunks are sorted by bucket. |
87 | while (!chunks_map.empty()) |
88 | if (try_push_by_iter(chunks_map.begin())) |
89 | return true; |
90 | } |
91 | else |
92 | { |
93 | for (; next_bucket_to_push < current_bucket; ++next_bucket_to_push) |
94 | if (try_push_by_iter(chunks_map.find(next_bucket_to_push))) |
95 | return true; |
96 | } |
97 | |
98 | return false; |
99 | } |
100 | |
101 | bool GroupingAggregatedTransform::tryPushSingleLevelData() |
102 | { |
103 | if (single_level_chunks.empty()) |
104 | return false; |
105 | |
106 | pushData(std::move(single_level_chunks), -1, false); |
107 | return true; |
108 | } |
109 | |
110 | bool GroupingAggregatedTransform::tryPushOverflowData() |
111 | { |
112 | if (overflow_chunks.empty()) |
113 | return false; |
114 | |
115 | pushData(std::move(overflow_chunks), -1, true); |
116 | return true; |
117 | } |
118 | |
119 | IProcessor::Status GroupingAggregatedTransform::prepare() |
120 | { |
121 | /// Check can output. |
122 | auto & output = outputs.front(); |
123 | |
124 | if (output.isFinished()) |
125 | { |
126 | for (auto & input : inputs) |
127 | input.close(); |
128 | |
129 | chunks_map.clear(); |
130 | last_bucket_number.clear(); |
131 | return Status::Finished; |
132 | } |
133 | |
134 | /// Read first time from each input to understand if we have two-level aggregation. |
135 | if (!read_from_all_inputs) |
136 | { |
137 | readFromAllInputs(); |
138 | if (!read_from_all_inputs) |
139 | return Status::NeedData; |
140 | } |
141 | |
142 | /// Convert single level to two levels if have two-level input. |
143 | if (has_two_level && !single_level_chunks.empty()) |
144 | return Status::Ready; |
145 | |
146 | /// Check can push (to avoid data caching). |
147 | if (!output.canPush()) |
148 | { |
149 | for (auto & input : inputs) |
150 | input.setNotNeeded(); |
151 | |
152 | return Status::PortFull; |
153 | } |
154 | |
155 | bool pushed_to_output = false; |
156 | |
157 | /// Output if has data. |
158 | if (has_two_level) |
159 | pushed_to_output = tryPushTwoLevelData(); |
160 | |
161 | auto need_input = [this](size_t input_num) |
162 | { |
163 | if (last_bucket_number[input_num] < current_bucket) |
164 | return true; |
165 | |
166 | return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket; |
167 | }; |
168 | |
169 | /// Read next bucket if can. |
170 | for (; ; ++current_bucket) |
171 | { |
172 | bool finished = true; |
173 | bool need_data = false; |
174 | |
175 | auto in = inputs.begin(); |
176 | for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in) |
177 | { |
178 | if (in->isFinished()) |
179 | continue; |
180 | |
181 | finished = false; |
182 | |
183 | if (!need_input(input_num)) |
184 | continue; |
185 | |
186 | in->setNeeded(); |
187 | |
188 | if (!in->hasData()) |
189 | { |
190 | need_data = true; |
191 | continue; |
192 | } |
193 | |
194 | auto chunk = in->pull(); |
195 | addChunk(std::move(chunk), input_num); |
196 | |
197 | if (has_two_level && !single_level_chunks.empty()) |
198 | return Status::Ready; |
199 | |
200 | if (!in->isFinished() && need_input(input_num)) |
201 | need_data = true; |
202 | } |
203 | |
204 | if (finished) |
205 | { |
206 | all_inputs_finished = true; |
207 | break; |
208 | } |
209 | |
210 | if (need_data) |
211 | return Status::NeedData; |
212 | } |
213 | |
214 | if (pushed_to_output) |
215 | return Status::PortFull; |
216 | |
217 | if (has_two_level) |
218 | { |
219 | if (tryPushTwoLevelData()) |
220 | return Status::PortFull; |
221 | |
222 | /// Sanity check. If new bucket was read, we should be able to push it. |
223 | if (!all_inputs_finished) |
224 | throw Exception("GroupingAggregatedTransform has read new two-level bucket, but couldn't push it." , |
225 | ErrorCodes::LOGICAL_ERROR); |
226 | } |
227 | else |
228 | { |
229 | if (!all_inputs_finished) |
230 | throw Exception("GroupingAggregatedTransform should have read all chunks for single level aggregation, " |
231 | "but not all of the inputs are finished." , ErrorCodes::LOGICAL_ERROR); |
232 | |
233 | if (tryPushSingleLevelData()) |
234 | return Status::PortFull; |
235 | } |
236 | |
237 | /// If we haven't pushed to output, then all data was read. Push overflows if have. |
238 | if (tryPushOverflowData()) |
239 | return Status::PortFull; |
240 | |
241 | output.finish(); |
242 | return Status::Finished; |
243 | } |
244 | |
245 | void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input) |
246 | { |
247 | auto & info = chunk.getChunkInfo(); |
248 | if (!info) |
249 | throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
250 | |
251 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()); |
252 | if (!agg_info) |
253 | throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
254 | |
255 | Int32 bucket = agg_info->bucket_num; |
256 | bool is_overflows = agg_info->is_overflows; |
257 | |
258 | if (is_overflows) |
259 | overflow_chunks.emplace_back(std::move(chunk)); |
260 | else if (bucket < 0) |
261 | single_level_chunks.emplace_back(std::move(chunk)); |
262 | else |
263 | { |
264 | chunks_map[bucket].emplace_back(std::move(chunk)); |
265 | has_two_level = true; |
266 | last_bucket_number[input] = bucket; |
267 | } |
268 | } |
269 | |
270 | void GroupingAggregatedTransform::work() |
271 | { |
272 | if (!single_level_chunks.empty()) |
273 | { |
274 | auto & = getOutputs().front().getHeader(); |
275 | auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns()); |
276 | single_level_chunks.pop_back(); |
277 | auto blocks = params->aggregator.convertBlockToTwoLevel(block); |
278 | |
279 | for (auto & cur_block : blocks) |
280 | { |
281 | Int32 bucket = cur_block.info.bucket_num; |
282 | chunks_map[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows())); |
283 | } |
284 | } |
285 | } |
286 | |
287 | |
288 | MergingAggregatedBucketTransform::MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params_) |
289 | : ISimpleTransform({}, params_->getHeader(), false), params(std::move(params_)) |
290 | { |
291 | setInputNotNeededAfterRead(true); |
292 | } |
293 | |
294 | void MergingAggregatedBucketTransform::transform(Chunk & chunk) |
295 | { |
296 | auto & info = chunk.getChunkInfo(); |
297 | auto * chunks_to_merge = typeid_cast<const ChunksToMerge *>(info.get()); |
298 | |
299 | if (!chunks_to_merge) |
300 | throw Exception("MergingAggregatedSimpleTransform chunk must have ChunkInfo with type ChunksToMerge." , |
301 | ErrorCodes::LOGICAL_ERROR); |
302 | |
303 | auto = params->aggregator.getHeader(false); |
304 | |
305 | BlocksList blocks_list; |
306 | for (auto & cur_chunk : *chunks_to_merge->chunks) |
307 | { |
308 | auto & cur_info = cur_chunk.getChunkInfo(); |
309 | if (!cur_info) |
310 | throw Exception("Chunk info was not set for chunk in MergingAggregatedBucketTransform." , |
311 | ErrorCodes::LOGICAL_ERROR); |
312 | |
313 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(cur_info.get()); |
314 | if (!agg_info) |
315 | throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedBucketTransform." , |
316 | ErrorCodes::LOGICAL_ERROR); |
317 | |
318 | Block block = header.cloneWithColumns(cur_chunk.detachColumns()); |
319 | block.info.is_overflows = agg_info->is_overflows; |
320 | block.info.bucket_num = agg_info->bucket_num; |
321 | |
322 | blocks_list.emplace_back(std::move(block)); |
323 | } |
324 | |
325 | auto res_info = std::make_shared<AggregatedChunkInfo>(); |
326 | res_info->is_overflows = chunks_to_merge->is_overflows; |
327 | res_info->bucket_num = chunks_to_merge->bucket_num; |
328 | chunk.setChunkInfo(std::move(res_info)); |
329 | |
330 | auto block = params->aggregator.mergeBlocks(blocks_list, params->final); |
331 | size_t num_rows = block.rows(); |
332 | chunk.setColumns(block.getColumns(), num_rows); |
333 | } |
334 | |
335 | |
336 | SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, AggregatingTransformParamsPtr params_) |
337 | : IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()}) |
338 | , num_inputs(num_inputs_) |
339 | , params(std::move(params_)) |
340 | , last_bucket_number(num_inputs, -1) |
341 | , is_input_finished(num_inputs, false) |
342 | { |
343 | } |
344 | |
345 | bool SortingAggregatedTransform::tryPushChunk() |
346 | { |
347 | auto & output = outputs.front(); |
348 | |
349 | if (chunks.empty()) |
350 | return false; |
351 | |
352 | /// Chunk with min current bucket. |
353 | auto it = chunks.begin(); |
354 | auto cur_bucket = it->first; |
355 | |
356 | /// Check that can push it |
357 | for (size_t input = 0; input < num_inputs; ++input) |
358 | if (!is_input_finished[input] && last_bucket_number[input] < cur_bucket) |
359 | return false; |
360 | |
361 | output.push(std::move(it->second)); |
362 | chunks.erase(it); |
363 | return true; |
364 | } |
365 | |
366 | void SortingAggregatedTransform::addChunk(Chunk chunk, size_t from_input) |
367 | { |
368 | auto & info = chunk.getChunkInfo(); |
369 | if (!info) |
370 | throw Exception("Chunk info was not set for chunk in SortingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
371 | |
372 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()); |
373 | if (!agg_info) |
374 | throw Exception("Chunk should have AggregatedChunkInfo in SortingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
375 | |
376 | Int32 bucket = agg_info->bucket_num; |
377 | bool is_overflows = agg_info->is_overflows; |
378 | |
379 | if (is_overflows) |
380 | overflow_chunk = std::move(chunk); |
381 | else |
382 | { |
383 | if (chunks[bucket]) |
384 | throw Exception("SortingAggregatedTransform already got bucket with number " + toString(bucket), |
385 | ErrorCodes::LOGICAL_ERROR); |
386 | |
387 | chunks[bucket] = std::move(chunk); |
388 | last_bucket_number[from_input] = bucket; |
389 | } |
390 | } |
391 | |
392 | IProcessor::Status SortingAggregatedTransform::prepare() |
393 | { |
394 | /// Check can output. |
395 | auto & output = outputs.front(); |
396 | |
397 | if (output.isFinished()) |
398 | { |
399 | for (auto & input : inputs) |
400 | input.close(); |
401 | |
402 | chunks.clear(); |
403 | last_bucket_number.clear(); |
404 | return Status::Finished; |
405 | } |
406 | |
407 | /// Check can push (to avoid data caching). |
408 | if (!output.canPush()) |
409 | { |
410 | for (auto & input : inputs) |
411 | input.setNotNeeded(); |
412 | |
413 | return Status::PortFull; |
414 | } |
415 | |
416 | /// Push if have min version. |
417 | bool pushed_to_output = tryPushChunk(); |
418 | |
419 | bool need_data = false; |
420 | bool all_finished = true; |
421 | |
422 | /// Try read anything. |
423 | auto in = inputs.begin(); |
424 | for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in) |
425 | { |
426 | if (in->isFinished()) |
427 | { |
428 | is_input_finished[input_num] = true; |
429 | continue; |
430 | } |
431 | |
432 | //all_finished = false; |
433 | |
434 | in->setNeeded(); |
435 | |
436 | if (!in->hasData()) |
437 | { |
438 | need_data = true; |
439 | all_finished = false; |
440 | continue; |
441 | } |
442 | |
443 | auto chunk = in->pull(); |
444 | addChunk(std::move(chunk), input_num); |
445 | |
446 | if (in->isFinished()) |
447 | { |
448 | is_input_finished[input_num] = true; |
449 | } |
450 | else |
451 | { |
452 | /// If chunk was pulled, then we need data from this port. |
453 | need_data = true; |
454 | all_finished = false; |
455 | } |
456 | } |
457 | |
458 | if (pushed_to_output) |
459 | return Status::PortFull; |
460 | |
461 | if (tryPushChunk()) |
462 | return Status::PortFull; |
463 | |
464 | if (need_data) |
465 | return Status::NeedData; |
466 | |
467 | if (!all_finished) |
468 | throw Exception("SortingAggregatedTransform has read bucket, but couldn't push it." , |
469 | ErrorCodes::LOGICAL_ERROR); |
470 | |
471 | if (overflow_chunk) |
472 | { |
473 | output.push(std::move(overflow_chunk)); |
474 | return Status::PortFull; |
475 | } |
476 | |
477 | output.finish(); |
478 | return Status::Finished; |
479 | } |
480 | |
481 | |
482 | Processors createMergingAggregatedMemoryEfficientPipe( |
483 | Block , |
484 | AggregatingTransformParamsPtr params, |
485 | size_t num_inputs, |
486 | size_t num_merging_processors) |
487 | { |
488 | Processors processors; |
489 | processors.reserve(num_merging_processors + 2); |
490 | |
491 | auto grouping = std::make_shared<GroupingAggregatedTransform>(header, num_inputs, params); |
492 | processors.emplace_back(std::move(grouping)); |
493 | |
494 | if (num_merging_processors <= 1) |
495 | { |
496 | /// --> GroupingAggregated --> MergingAggregatedBucket --> |
497 | auto transform = std::make_shared<MergingAggregatedBucketTransform>(params); |
498 | connect(processors.back()->getOutputs().front(), transform->getInputPort()); |
499 | |
500 | processors.emplace_back(std::move(transform)); |
501 | return processors; |
502 | } |
503 | |
504 | /// --> --> MergingAggregatedBucket --> |
505 | /// --> GroupingAggregated --> ResizeProcessor --> MergingAggregatedBucket --> SortingAggregated --> |
506 | /// --> --> MergingAggregatedBucket --> |
507 | |
508 | auto resize = std::make_shared<ResizeProcessor>(Block(), 1, num_merging_processors); |
509 | connect(processors.back()->getOutputs().front(), resize->getInputs().front()); |
510 | processors.emplace_back(std::move(resize)); |
511 | |
512 | auto sorting = std::make_shared<SortingAggregatedTransform>(num_merging_processors, params); |
513 | auto out = processors.back()->getOutputs().begin(); |
514 | auto in = sorting->getInputs().begin(); |
515 | |
516 | for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out) |
517 | { |
518 | auto transform = std::make_shared<MergingAggregatedBucketTransform>(params); |
519 | transform->setStream(i); |
520 | connect(*out, transform->getInputPort()); |
521 | connect(transform->getOutputPort(), *in); |
522 | processors.emplace_back(std::move(transform)); |
523 | } |
524 | |
525 | processors.emplace_back(std::move(sorting)); |
526 | return processors; |
527 | } |
528 | |
529 | } |
530 | |