1 | #pragma once |
2 | |
3 | #include <Interpreters/Aggregator.h> |
4 | #include <IO/ReadBufferFromFile.h> |
5 | #include <Compression/CompressedReadBuffer.h> |
6 | #include <DataStreams/IBlockInputStream.h> |
7 | #include <DataStreams/ParallelInputsProcessor.h> |
8 | #include <DataStreams/TemporaryFileStream.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | |
15 | /** Aggregates several sources in parallel. |
16 | * Makes aggregation of blocks from different sources independently in different threads, then combines the results. |
17 | * If final == false, aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations. |
18 | * This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data). |
19 | */ |
20 | class ParallelAggregatingBlockInputStream : public IBlockInputStream |
21 | { |
22 | public: |
23 | /** Columns from key_names and arguments of aggregate functions must already be computed. |
24 | */ |
25 | ParallelAggregatingBlockInputStream( |
26 | const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end, |
27 | const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_); |
28 | |
29 | String getName() const override { return "ParallelAggregating" ; } |
30 | |
31 | void cancel(bool kill) override; |
32 | |
33 | Block () const override; |
34 | |
35 | protected: |
36 | /// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor. |
37 | void readPrefix() override |
38 | { |
39 | } |
40 | |
41 | Block readImpl() override; |
42 | |
43 | private: |
44 | Aggregator::Params params; |
45 | Aggregator aggregator; |
46 | bool final; |
47 | size_t max_threads; |
48 | size_t temporary_data_merge_threads; |
49 | |
50 | size_t keys_size; |
51 | size_t aggregates_size; |
52 | |
53 | /** Used if there is a limit on the maximum number of rows in the aggregation, |
54 | * and if group_by_overflow_mode == ANY. |
55 | * In this case, new keys are not added to the set, but aggregation is performed only by |
56 | * keys that have already been added into the set. |
57 | */ |
58 | bool no_more_keys = false; |
59 | |
60 | std::atomic<bool> executed {false}; |
61 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; |
62 | |
63 | Logger * log = &Logger::get("ParallelAggregatingBlockInputStream" ); |
64 | |
65 | |
66 | ManyAggregatedDataVariants many_data; |
67 | Exceptions exceptions; |
68 | |
69 | struct ThreadData |
70 | { |
71 | size_t src_rows = 0; |
72 | size_t src_bytes = 0; |
73 | |
74 | ColumnRawPtrs key_columns; |
75 | Aggregator::AggregateColumns aggregate_columns; |
76 | |
77 | ThreadData(size_t keys_size_, size_t aggregates_size_) |
78 | { |
79 | key_columns.resize(keys_size_); |
80 | aggregate_columns.resize(aggregates_size_); |
81 | } |
82 | }; |
83 | |
84 | std::vector<ThreadData> threads_data; |
85 | |
86 | |
87 | struct Handler |
88 | { |
89 | Handler(ParallelAggregatingBlockInputStream & parent_) |
90 | : parent(parent_) {} |
91 | |
92 | void onBlock(Block & block, size_t thread_num); |
93 | void onFinishThread(size_t thread_num); |
94 | void onFinish(); |
95 | void onException(std::exception_ptr & exception, size_t thread_num); |
96 | |
97 | ParallelAggregatingBlockInputStream & parent; |
98 | }; |
99 | |
100 | Handler handler; |
101 | ParallelInputsProcessor<Handler> processor; |
102 | |
103 | |
104 | void execute(); |
105 | |
106 | |
107 | /** From here we get the finished blocks after the aggregation. |
108 | */ |
109 | std::unique_ptr<IBlockInputStream> impl; |
110 | }; |
111 | |
112 | } |
113 | |