1#include <DataStreams/RollupBlockInputStream.h>
2#include <DataStreams/finalizeBlock.h>
3#include <DataTypes/DataTypeAggregateFunction.h>
4#include <Columns/ColumnAggregateFunction.h>
5#include <Columns/FilterDescription.h>
6#include <Common/typeid_cast.h>
7
8namespace DB
9{
10
11RollupBlockInputStream::RollupBlockInputStream(
12 const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
13 keys(params_.keys)
14{
15 children.push_back(input_);
16 Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
17 aggregator.setCancellationHook(hook);
18}
19
20
21Block RollupBlockInputStream::getHeader() const
22{
23 Block res = children.at(0)->getHeader();
24 finalizeBlock(res);
25 return res;
26}
27
28
29Block RollupBlockInputStream::readImpl()
30{
31 /** After reading a block from input stream,
32 * we will subsequently roll it up on next iterations of 'readImpl'
33 * by zeroing out every column one-by-one and re-merging a block.
34 */
35
36 if (!is_data_read)
37 {
38 BlocksList source_blocks;
39 while (auto block = children[0]->read())
40 source_blocks.push_back(block);
41
42 if (source_blocks.empty())
43 return {};
44
45 is_data_read = true;
46 if (source_blocks.size() > 1)
47 rollup_block = aggregator.mergeBlocks(source_blocks, false);
48 else
49 rollup_block = std::move(source_blocks.front());
50
51 current_key = keys.size() - 1;
52
53 auto finalized = rollup_block;
54 finalizeBlock(finalized);
55 return finalized;
56 }
57
58 if (current_key < 0)
59 return {};
60
61 auto & current = rollup_block.getByPosition(keys[current_key]);
62 current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
63 --current_key;
64
65 BlocksList rollup_blocks = { rollup_block };
66 rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
67
68 auto finalized = rollup_block;
69 finalizeBlock(finalized);
70 return finalized;
71}
72}
73