1 | #include <Processors/Transforms/RollupTransform.h> |
2 | #include <Processors/Transforms/TotalsHavingTransform.h> |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | RollupTransform::RollupTransform(Block , AggregatingTransformParamsPtr params_) |
8 | : IAccumulatingTransform(std::move(header), params_->getHeader()) |
9 | , params(std::move(params_)) |
10 | , keys(params->params.keys) |
11 | { |
12 | } |
13 | |
14 | void RollupTransform::consume(Chunk chunk) |
15 | { |
16 | consumed_chunks.emplace_back(std::move(chunk)); |
17 | } |
18 | |
19 | Chunk RollupTransform::merge(Chunks && chunks, bool final) |
20 | { |
21 | BlocksList rollup_blocks; |
22 | for (auto & chunk : chunks) |
23 | rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); |
24 | |
25 | auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final); |
26 | auto num_rows = rollup_block.rows(); |
27 | return Chunk(rollup_block.getColumns(), num_rows); |
28 | } |
29 | |
30 | Chunk RollupTransform::generate() |
31 | { |
32 | if (!consumed_chunks.empty()) |
33 | { |
34 | if (consumed_chunks.size() > 1) |
35 | rollup_chunk = merge(std::move(consumed_chunks), false); |
36 | else |
37 | rollup_chunk = std::move(consumed_chunks.front()); |
38 | |
39 | consumed_chunks.clear(); |
40 | last_removed_key = keys.size(); |
41 | } |
42 | |
43 | auto gen_chunk = std::move(rollup_chunk); |
44 | |
45 | if (last_removed_key) |
46 | { |
47 | --last_removed_key; |
48 | auto key = keys[last_removed_key]; |
49 | |
50 | auto num_rows = gen_chunk.getNumRows(); |
51 | auto columns = gen_chunk.getColumns(); |
52 | columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows); |
53 | |
54 | Chunks chunks; |
55 | chunks.emplace_back(std::move(columns), num_rows); |
56 | rollup_chunk = merge(std::move(chunks), false); |
57 | } |
58 | |
59 | finalizeChunk(gen_chunk); |
60 | return gen_chunk; |
61 | } |
62 | |
63 | } |
64 | |