1#include <Processors/Transforms/MergingAggregatedTransform.h>
2#include <Processors/Transforms/AggregatingTransform.h>
3
4namespace DB
5{
6
7MergingAggregatedTransform::MergingAggregatedTransform(
8 Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
9 : IAccumulatingTransform(std::move(header_), params_->getHeader())
10 , params(std::move(params_)), max_threads(max_threads_)
11{
12}
13
14void MergingAggregatedTransform::consume(Chunk chunk)
15{
16 if (!consume_started)
17 {
18 consume_started = true;
19 LOG_TRACE(log, "Reading blocks of partially aggregated data.");
20 }
21
22 total_input_rows += chunk.getNumRows();
23 ++total_input_blocks;
24
25 auto & info = chunk.getChunkInfo();
26 if (!info)
27 throw Exception("Chunk info was not set for chunk in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
28
29 auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
30 if (!agg_info)
31 throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
32
33 auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
34 block.info.is_overflows = agg_info->is_overflows;
35 block.info.bucket_num = agg_info->bucket_num;
36
37 bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
38}
39
40Chunk MergingAggregatedTransform::generate()
41{
42 if (!generate_started)
43 {
44 generate_started = true;
45 LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows
46 << " rows.");
47
48 /// Exception safety. Make iterator valid in case any method below throws.
49 next_block = blocks.begin();
50
51 /// TODO: this operation can be made async. Add async for IAccumulatingTransform.
52 params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads);
53 blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
54 next_block = blocks.begin();
55 }
56
57 if (next_block == blocks.end())
58 return {};
59
60 auto block = std::move(*next_block);
61 ++next_block;
62
63 auto info = std::make_shared<AggregatedChunkInfo>();
64 info->bucket_num = block.info.bucket_num;
65 info->is_overflows = block.info.is_overflows;
66
67 UInt64 num_rows = block.rows();
68 Chunk chunk(block.getColumns(), num_rows);
69 chunk.setChunkInfo(std::move(info));
70
71 return chunk;
72}
73
74}
75