| 1 | #pragma once | 
|---|---|
| 2 | |
| 3 | #include <Interpreters/Aggregator.h> | 
| 4 | #include <IO/ReadBufferFromFile.h> | 
| 5 | #include <Compression/CompressedReadBuffer.h> | 
| 6 | #include <DataStreams/IBlockInputStream.h> | 
| 7 | #include <DataStreams/TemporaryFileStream.h> | 
| 8 | |
| 9 | |
| 10 | namespace DB | 
| 11 | { | 
| 12 | |
| 13 | |
| 14 | /** Aggregates the stream of blocks using the specified key columns and aggregate functions. | 
| 15 | * Columns with aggregate functions adds to the end of the block. | 
| 16 | * If final = false, the aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations. | 
| 17 | * This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data). | 
| 18 | */ | 
| 19 | class AggregatingBlockInputStream : public IBlockInputStream | 
| 20 | { | 
| 21 | public: | 
| 22 | /** keys are taken from the GROUP BY part of the query | 
| 23 | * Aggregate functions are searched everywhere in the expression. | 
| 24 | * Columns corresponding to keys and arguments of aggregate functions must already be computed. | 
| 25 | */ | 
| 26 | AggregatingBlockInputStream(const BlockInputStreamPtr & input, const Aggregator::Params & params_, bool final_) | 
| 27 | : params(params_), aggregator(params), final(final_) | 
| 28 | { | 
| 29 | children.push_back(input); | 
| 30 | } | 
| 31 | |
| 32 | String getName() const override { return "Aggregating"; } | 
| 33 | |
| 34 | Block getHeader() const override; | 
| 35 | |
| 36 | protected: | 
| 37 | Block readImpl() override; | 
| 38 | |
| 39 | Aggregator::Params params; | 
| 40 | Aggregator aggregator; | 
| 41 | bool final; | 
| 42 | |
| 43 | bool executed = false; | 
| 44 | |
| 45 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; | 
| 46 | |
| 47 | /** From here we will get the completed blocks after the aggregation. */ | 
| 48 | std::unique_ptr<IBlockInputStream> impl; | 
| 49 | |
| 50 | Logger * log = &Logger::get( "AggregatingBlockInputStream"); | 
| 51 | }; | 
| 52 | |
| 53 | } | 
| 54 | 
