1 | #include <DataStreams/CubeBlockInputStream.h> |
---|---|
2 | #include <DataStreams/finalizeBlock.h> |
3 | #include <DataTypes/DataTypeAggregateFunction.h> |
4 | #include <Columns/ColumnAggregateFunction.h> |
5 | #include <Columns/FilterDescription.h> |
6 | #include <Common/typeid_cast.h> |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | namespace ErrorCodes |
12 | { |
13 | extern const int TOO_MANY_COLUMNS; |
14 | } |
15 | |
16 | CubeBlockInputStream::CubeBlockInputStream( |
17 | const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_), |
18 | keys(params_.keys) |
19 | { |
20 | if (keys.size() > 30) |
21 | throw Exception("Too many columns for cube", ErrorCodes::TOO_MANY_COLUMNS); |
22 | |
23 | children.push_back(input_); |
24 | Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); }; |
25 | aggregator.setCancellationHook(hook); |
26 | } |
27 | |
28 | |
29 | Block CubeBlockInputStream::getHeader() const |
30 | { |
31 | Block res = children.at(0)->getHeader(); |
32 | finalizeBlock(res); |
33 | return res; |
34 | } |
35 | |
36 | |
37 | Block CubeBlockInputStream::readImpl() |
38 | { |
39 | /** After reading all blocks from input stream, |
40 | * we will calculate all subsets of columns on next iterations of readImpl |
41 | * by zeroing columns at positions, where bits are zero in current bitmask. |
42 | */ |
43 | |
44 | if (!is_data_read) |
45 | { |
46 | BlocksList source_blocks; |
47 | while (auto block = children[0]->read()) |
48 | source_blocks.push_back(block); |
49 | |
50 | if (source_blocks.empty()) |
51 | return {}; |
52 | |
53 | is_data_read = true; |
54 | mask = (1 << keys.size()) - 1; |
55 | |
56 | if (source_blocks.size() > 1) |
57 | source_block = aggregator.mergeBlocks(source_blocks, false); |
58 | else |
59 | source_block = std::move(source_blocks.front()); |
60 | |
61 | zero_block = source_block.cloneEmpty(); |
62 | for (auto key : keys) |
63 | { |
64 | auto & current = zero_block.getByPosition(key); |
65 | current.column = current.column->cloneResized(source_block.rows()); |
66 | } |
67 | |
68 | auto finalized = source_block; |
69 | finalizeBlock(finalized); |
70 | return finalized; |
71 | } |
72 | |
73 | if (!mask) |
74 | return {}; |
75 | |
76 | --mask; |
77 | auto cube_block = source_block; |
78 | |
79 | for (size_t i = 0; i < keys.size(); ++i) |
80 | { |
81 | if (!((mask >> i) & 1)) |
82 | { |
83 | size_t pos = keys.size() - i - 1; |
84 | auto & current = cube_block.getByPosition(keys[pos]); |
85 | current.column = zero_block.getByPosition(keys[pos]).column; |
86 | } |
87 | } |
88 | |
89 | BlocksList cube_blocks = { cube_block }; |
90 | Block finalized = aggregator.mergeBlocks(cube_blocks, true); |
91 | return finalized; |
92 | } |
93 | } |
94 |