1#include <Processors/Transforms/CubeTransform.h>
2#include <Processors/Transforms/TotalsHavingTransform.h>
3
4namespace DB
5{
6
7CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_)
8 : IAccumulatingTransform(std::move(header), params_->getHeader())
9 , params(std::move(params_))
10 , keys(params->params.keys)
11{
12 if (keys.size() >= 8 * sizeof(mask))
13 throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR);
14}
15
16Chunk CubeTransform::merge(Chunks && chunks, bool final)
17{
18 BlocksList rollup_blocks;
19 for (auto & chunk : chunks)
20 rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
21
22 auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
23 auto num_rows = rollup_block.rows();
24 return Chunk(rollup_block.getColumns(), num_rows);
25}
26
27void CubeTransform::consume(Chunk chunk)
28{
29 consumed_chunks.emplace_back(std::move(chunk));
30}
31
32Chunk CubeTransform::generate()
33{
34 if (!consumed_chunks.empty())
35 {
36 if (consumed_chunks.size() > 1)
37 cube_chunk = merge(std::move(consumed_chunks), false);
38 else
39 cube_chunk = std::move(consumed_chunks.front());
40
41 consumed_chunks.clear();
42
43 auto num_rows = cube_chunk.getNumRows();
44 mask = (UInt64(1) << keys.size()) - 1;
45
46 current_columns = cube_chunk.getColumns();
47 current_zero_columns.clear();
48 current_zero_columns.reserve(keys.size());
49
50 for (auto key : keys)
51 current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows));
52 }
53
54 auto gen_chunk = std::move(cube_chunk);
55
56 if (mask)
57 {
58 --mask;
59
60 auto columns = current_columns;
61 auto size = keys.size();
62 for (size_t i = 0; i < size; ++i)
63 /// Reverse bit order to support previous behaviour.
64 if ((mask & (UInt64(1) << (size - i - 1))) == 0)
65 columns[keys[i]] = current_zero_columns[i];
66
67 Chunks chunks;
68 chunks.emplace_back(std::move(columns), current_columns.front()->size());
69 cube_chunk = merge(std::move(chunks), false);
70 }
71
72 finalizeChunk(gen_chunk);
73 return gen_chunk;
74}
75
76}
77