1#pragma once
2
3#include <Interpreters/Aggregator.h>
4#include <IO/ReadBufferFromFile.h>
5#include <Compression/CompressedReadBuffer.h>
6#include <DataStreams/IBlockInputStream.h>
7#include <DataStreams/ParallelInputsProcessor.h>
8#include <DataStreams/TemporaryFileStream.h>
9
10
11namespace DB
12{
13
14
15/** Aggregates several sources in parallel.
16 * Makes aggregation of blocks from different sources independently in different threads, then combines the results.
17 * If final == false, aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
18 * This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
19 */
20class ParallelAggregatingBlockInputStream : public IBlockInputStream
21{
22public:
23 /** Columns from key_names and arguments of aggregate functions must already be computed.
24 */
25 ParallelAggregatingBlockInputStream(
26 const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end,
27 const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_);
28
29 String getName() const override { return "ParallelAggregating"; }
30
31 void cancel(bool kill) override;
32
33 Block getHeader() const override;
34
35protected:
36 /// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor.
37 void readPrefix() override
38 {
39 }
40
41 Block readImpl() override;
42
43private:
44 Aggregator::Params params;
45 Aggregator aggregator;
46 bool final;
47 size_t max_threads;
48 size_t temporary_data_merge_threads;
49
50 size_t keys_size;
51 size_t aggregates_size;
52
53 /** Used if there is a limit on the maximum number of rows in the aggregation,
54 * and if group_by_overflow_mode == ANY.
55 * In this case, new keys are not added to the set, but aggregation is performed only by
56 * keys that have already been added into the set.
57 */
58 bool no_more_keys = false;
59
60 std::atomic<bool> executed {false};
61 std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
62
63 Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");
64
65
66 ManyAggregatedDataVariants many_data;
67 Exceptions exceptions;
68
69 struct ThreadData
70 {
71 size_t src_rows = 0;
72 size_t src_bytes = 0;
73
74 ColumnRawPtrs key_columns;
75 Aggregator::AggregateColumns aggregate_columns;
76
77 ThreadData(size_t keys_size_, size_t aggregates_size_)
78 {
79 key_columns.resize(keys_size_);
80 aggregate_columns.resize(aggregates_size_);
81 }
82 };
83
84 std::vector<ThreadData> threads_data;
85
86
87 struct Handler
88 {
89 Handler(ParallelAggregatingBlockInputStream & parent_)
90 : parent(parent_) {}
91
92 void onBlock(Block & block, size_t thread_num);
93 void onFinishThread(size_t thread_num);
94 void onFinish();
95 void onException(std::exception_ptr & exception, size_t thread_num);
96
97 ParallelAggregatingBlockInputStream & parent;
98 };
99
100 Handler handler;
101 ParallelInputsProcessor<Handler> processor;
102
103
104 void execute();
105
106
107 /** From here we get the finished blocks after the aggregation.
108 */
109 std::unique_ptr<IBlockInputStream> impl;
110};
111
112}
113