1#include <Columns/ColumnsNumber.h>
2
3#include <DataStreams/MergingAggregatedBlockInputStream.h>
4
5
6namespace DB
7{
8
9Block MergingAggregatedBlockInputStream::getHeader() const
10{
11 return aggregator.getHeader(final);
12}
13
14
15Block MergingAggregatedBlockInputStream::readImpl()
16{
17 if (!executed)
18 {
19 executed = true;
20 AggregatedDataVariants data_variants;
21
22 Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
23 aggregator.setCancellationHook(hook);
24
25 aggregator.mergeStream(children.back(), data_variants, max_threads);
26 blocks = aggregator.convertToBlocks(data_variants, final, max_threads);
27 it = blocks.begin();
28 }
29
30 Block res;
31 if (isCancelledOrThrowIfKilled() || it == blocks.end())
32 return res;
33
34 res = std::move(*it);
35 ++it;
36
37 return res;
38}
39
40
41}
42