1#include <DataStreams/CubeBlockInputStream.h>
2#include <DataStreams/finalizeBlock.h>
3#include <DataTypes/DataTypeAggregateFunction.h>
4#include <Columns/ColumnAggregateFunction.h>
5#include <Columns/FilterDescription.h>
6#include <Common/typeid_cast.h>
7
8namespace DB
9{
10
11namespace ErrorCodes
12{
13 extern const int TOO_MANY_COLUMNS;
14}
15
16CubeBlockInputStream::CubeBlockInputStream(
17 const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
18 keys(params_.keys)
19{
20 if (keys.size() > 30)
21 throw Exception("Too many columns for cube", ErrorCodes::TOO_MANY_COLUMNS);
22
23 children.push_back(input_);
24 Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
25 aggregator.setCancellationHook(hook);
26}
27
28
29Block CubeBlockInputStream::getHeader() const
30{
31 Block res = children.at(0)->getHeader();
32 finalizeBlock(res);
33 return res;
34}
35
36
37Block CubeBlockInputStream::readImpl()
38{
39 /** After reading all blocks from input stream,
40 * we will calculate all subsets of columns on next iterations of readImpl
41 * by zeroing columns at positions, where bits are zero in current bitmask.
42 */
43
44 if (!is_data_read)
45 {
46 BlocksList source_blocks;
47 while (auto block = children[0]->read())
48 source_blocks.push_back(block);
49
50 if (source_blocks.empty())
51 return {};
52
53 is_data_read = true;
54 mask = (1 << keys.size()) - 1;
55
56 if (source_blocks.size() > 1)
57 source_block = aggregator.mergeBlocks(source_blocks, false);
58 else
59 source_block = std::move(source_blocks.front());
60
61 zero_block = source_block.cloneEmpty();
62 for (auto key : keys)
63 {
64 auto & current = zero_block.getByPosition(key);
65 current.column = current.column->cloneResized(source_block.rows());
66 }
67
68 auto finalized = source_block;
69 finalizeBlock(finalized);
70 return finalized;
71 }
72
73 if (!mask)
74 return {};
75
76 --mask;
77 auto cube_block = source_block;
78
79 for (size_t i = 0; i < keys.size(); ++i)
80 {
81 if (!((mask >> i) & 1))
82 {
83 size_t pos = keys.size() - i - 1;
84 auto & current = cube_block.getByPosition(keys[pos]);
85 current.column = zero_block.getByPosition(keys[pos]).column;
86 }
87 }
88
89 BlocksList cube_blocks = { cube_block };
90 Block finalized = aggregator.mergeBlocks(cube_blocks, true);
91 return finalized;
92}
93}
94