| 1 | #include <Processors/Transforms/MergingAggregatedTransform.h> |
| 2 | #include <Processors/Transforms/AggregatingTransform.h> |
| 3 | |
| 4 | namespace DB |
| 5 | { |
| 6 | |
| 7 | MergingAggregatedTransform::MergingAggregatedTransform( |
| 8 | Block , AggregatingTransformParamsPtr params_, size_t max_threads_) |
| 9 | : IAccumulatingTransform(std::move(header_), params_->getHeader()) |
| 10 | , params(std::move(params_)), max_threads(max_threads_) |
| 11 | { |
| 12 | } |
| 13 | |
| 14 | void MergingAggregatedTransform::consume(Chunk chunk) |
| 15 | { |
| 16 | if (!consume_started) |
| 17 | { |
| 18 | consume_started = true; |
| 19 | LOG_TRACE(log, "Reading blocks of partially aggregated data." ); |
| 20 | } |
| 21 | |
| 22 | total_input_rows += chunk.getNumRows(); |
| 23 | ++total_input_blocks; |
| 24 | |
| 25 | auto & info = chunk.getChunkInfo(); |
| 26 | if (!info) |
| 27 | throw Exception("Chunk info was not set for chunk in MergingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
| 28 | |
| 29 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()); |
| 30 | if (!agg_info) |
| 31 | throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform." , ErrorCodes::LOGICAL_ERROR); |
| 32 | |
| 33 | auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); |
| 34 | block.info.is_overflows = agg_info->is_overflows; |
| 35 | block.info.bucket_num = agg_info->bucket_num; |
| 36 | |
| 37 | bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block)); |
| 38 | } |
| 39 | |
| 40 | Chunk MergingAggregatedTransform::generate() |
| 41 | { |
| 42 | if (!generate_started) |
| 43 | { |
| 44 | generate_started = true; |
| 45 | LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows |
| 46 | << " rows." ); |
| 47 | |
| 48 | /// Exception safety. Make iterator valid in case any method below throws. |
| 49 | next_block = blocks.begin(); |
| 50 | |
| 51 | /// TODO: this operation can be made async. Add async for IAccumulatingTransform. |
| 52 | params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads); |
| 53 | blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); |
| 54 | next_block = blocks.begin(); |
| 55 | } |
| 56 | |
| 57 | if (next_block == blocks.end()) |
| 58 | return {}; |
| 59 | |
| 60 | auto block = std::move(*next_block); |
| 61 | ++next_block; |
| 62 | |
| 63 | auto info = std::make_shared<AggregatedChunkInfo>(); |
| 64 | info->bucket_num = block.info.bucket_num; |
| 65 | info->is_overflows = block.info.is_overflows; |
| 66 | |
| 67 | UInt64 num_rows = block.rows(); |
| 68 | Chunk chunk(block.getColumns(), num_rows); |
| 69 | chunk.setChunkInfo(std::move(info)); |
| 70 | |
| 71 | return chunk; |
| 72 | } |
| 73 | |
| 74 | } |
| 75 | |