1#include <Processors/Transforms/RollupTransform.h>
2#include <Processors/Transforms/TotalsHavingTransform.h>
3
4namespace DB
5{
6
7RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
8 : IAccumulatingTransform(std::move(header), params_->getHeader())
9 , params(std::move(params_))
10 , keys(params->params.keys)
11{
12}
13
14void RollupTransform::consume(Chunk chunk)
15{
16 consumed_chunks.emplace_back(std::move(chunk));
17}
18
19Chunk RollupTransform::merge(Chunks && chunks, bool final)
20{
21 BlocksList rollup_blocks;
22 for (auto & chunk : chunks)
23 rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
24
25 auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
26 auto num_rows = rollup_block.rows();
27 return Chunk(rollup_block.getColumns(), num_rows);
28}
29
30Chunk RollupTransform::generate()
31{
32 if (!consumed_chunks.empty())
33 {
34 if (consumed_chunks.size() > 1)
35 rollup_chunk = merge(std::move(consumed_chunks), false);
36 else
37 rollup_chunk = std::move(consumed_chunks.front());
38
39 consumed_chunks.clear();
40 last_removed_key = keys.size();
41 }
42
43 auto gen_chunk = std::move(rollup_chunk);
44
45 if (last_removed_key)
46 {
47 --last_removed_key;
48 auto key = keys[last_removed_key];
49
50 auto num_rows = gen_chunk.getNumRows();
51 auto columns = gen_chunk.getColumns();
52 columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows);
53
54 Chunks chunks;
55 chunks.emplace_back(std::move(columns), num_rows);
56 rollup_chunk = merge(std::move(chunks), false);
57 }
58
59 finalizeChunk(gen_chunk);
60 return gen_chunk;
61}
62
63}
64