| 1 | #pragma once |
| 2 | |
| 3 | #include <Interpreters/Aggregator.h> |
| 4 | #include <IO/ReadBufferFromFile.h> |
| 5 | #include <Compression/CompressedReadBuffer.h> |
| 6 | #include <DataStreams/IBlockInputStream.h> |
| 7 | #include <DataStreams/ParallelInputsProcessor.h> |
| 8 | #include <DataStreams/TemporaryFileStream.h> |
| 9 | |
| 10 | |
| 11 | namespace DB |
| 12 | { |
| 13 | |
| 14 | |
| 15 | /** Aggregates several sources in parallel. |
| 16 | * Makes aggregation of blocks from different sources independently in different threads, then combines the results. |
| 17 | * If final == false, aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations. |
| 18 | * This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data). |
| 19 | */ |
| 20 | class ParallelAggregatingBlockInputStream : public IBlockInputStream |
| 21 | { |
| 22 | public: |
| 23 | /** Columns from key_names and arguments of aggregate functions must already be computed. |
| 24 | */ |
| 25 | ParallelAggregatingBlockInputStream( |
| 26 | const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end, |
| 27 | const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_); |
| 28 | |
| 29 | String getName() const override { return "ParallelAggregating" ; } |
| 30 | |
| 31 | void cancel(bool kill) override; |
| 32 | |
| 33 | Block () const override; |
| 34 | |
| 35 | protected: |
| 36 | /// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor. |
| 37 | void readPrefix() override |
| 38 | { |
| 39 | } |
| 40 | |
| 41 | Block readImpl() override; |
| 42 | |
| 43 | private: |
| 44 | Aggregator::Params params; |
| 45 | Aggregator aggregator; |
| 46 | bool final; |
| 47 | size_t max_threads; |
| 48 | size_t temporary_data_merge_threads; |
| 49 | |
| 50 | size_t keys_size; |
| 51 | size_t aggregates_size; |
| 52 | |
| 53 | /** Used if there is a limit on the maximum number of rows in the aggregation, |
| 54 | * and if group_by_overflow_mode == ANY. |
| 55 | * In this case, new keys are not added to the set, but aggregation is performed only by |
| 56 | * keys that have already been added into the set. |
| 57 | */ |
| 58 | bool no_more_keys = false; |
| 59 | |
| 60 | std::atomic<bool> executed {false}; |
| 61 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; |
| 62 | |
| 63 | Logger * log = &Logger::get("ParallelAggregatingBlockInputStream" ); |
| 64 | |
| 65 | |
| 66 | ManyAggregatedDataVariants many_data; |
| 67 | Exceptions exceptions; |
| 68 | |
| 69 | struct ThreadData |
| 70 | { |
| 71 | size_t src_rows = 0; |
| 72 | size_t src_bytes = 0; |
| 73 | |
| 74 | ColumnRawPtrs key_columns; |
| 75 | Aggregator::AggregateColumns aggregate_columns; |
| 76 | |
| 77 | ThreadData(size_t keys_size_, size_t aggregates_size_) |
| 78 | { |
| 79 | key_columns.resize(keys_size_); |
| 80 | aggregate_columns.resize(aggregates_size_); |
| 81 | } |
| 82 | }; |
| 83 | |
| 84 | std::vector<ThreadData> threads_data; |
| 85 | |
| 86 | |
| 87 | struct Handler |
| 88 | { |
| 89 | Handler(ParallelAggregatingBlockInputStream & parent_) |
| 90 | : parent(parent_) {} |
| 91 | |
| 92 | void onBlock(Block & block, size_t thread_num); |
| 93 | void onFinishThread(size_t thread_num); |
| 94 | void onFinish(); |
| 95 | void onException(std::exception_ptr & exception, size_t thread_num); |
| 96 | |
| 97 | ParallelAggregatingBlockInputStream & parent; |
| 98 | }; |
| 99 | |
| 100 | Handler handler; |
| 101 | ParallelInputsProcessor<Handler> processor; |
| 102 | |
| 103 | |
| 104 | void execute(); |
| 105 | |
| 106 | |
| 107 | /** From here we get the finished blocks after the aggregation. |
| 108 | */ |
| 109 | std::unique_ptr<IBlockInputStream> impl; |
| 110 | }; |
| 111 | |
| 112 | } |
| 113 | |