1#include <DataStreams/BlocksListBlockInputStream.h>
2#include <DataStreams/NativeBlockInputStream.h>
3#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
4#include <DataStreams/ParallelAggregatingBlockInputStream.h>
5#include <Common/ClickHouseRevision.h>
6
7
8namespace ProfileEvents
9{
10 extern const Event ExternalAggregationMerge;
11}
12
13
14namespace DB
15{
16
17
18ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
19 const BlockInputStreams & inputs, const BlockInputStreamPtr & additional_input_at_end,
20 const Aggregator::Params & params_, bool final_, size_t max_threads_, size_t temporary_data_merge_threads_)
21 : params(params_), aggregator(params),
22 final(final_), max_threads(std::min(inputs.size(), max_threads_)), temporary_data_merge_threads(temporary_data_merge_threads_),
23 keys_size(params.keys_size), aggregates_size(params.aggregates_size),
24 handler(*this), processor(inputs, additional_input_at_end, max_threads, handler)
25{
26 children = inputs;
27 if (additional_input_at_end)
28 children.push_back(additional_input_at_end);
29}
30
31
32Block ParallelAggregatingBlockInputStream::getHeader() const
33{
34 return aggregator.getHeader(final);
35}
36
37
38void ParallelAggregatingBlockInputStream::cancel(bool kill)
39{
40 if (kill)
41 is_killed = true;
42 bool old_val = false;
43 if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
44 return;
45
46 if (!executed)
47 processor.cancel(kill);
48}
49
50
51Block ParallelAggregatingBlockInputStream::readImpl()
52{
53 if (!executed)
54 {
55 Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
56 aggregator.setCancellationHook(hook);
57
58 execute();
59
60 if (isCancelledOrThrowIfKilled())
61 return {};
62
63 if (!aggregator.hasTemporaryFiles())
64 {
65 /** If all partially-aggregated data is in RAM, then merge them in parallel, also in RAM.
66 */
67 impl = aggregator.mergeAndConvertToBlocks(many_data, final, max_threads);
68 }
69 else
70 {
71 /** If there are temporary files with partially-aggregated data on the disk,
72 * then read and merge them, spending the minimum amount of memory.
73 */
74
75 ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
76
77 const auto & files = aggregator.getTemporaryFiles();
78 BlockInputStreams input_streams;
79 for (const auto & file : files.files)
80 {
81 temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
82 input_streams.emplace_back(temporary_inputs.back()->block_in);
83 }
84
85 LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
86 << (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
87 << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
88
89 impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
90 input_streams, params, final, temporary_data_merge_threads, temporary_data_merge_threads);
91 }
92
93 executed = true;
94 }
95
96 Block res;
97 if (isCancelledOrThrowIfKilled() || !impl)
98 return res;
99
100 return impl->read();
101}
102
103
104void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
105{
106 parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
107 parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.no_more_keys);
108
109 parent.threads_data[thread_num].src_rows += block.rows();
110 parent.threads_data[thread_num].src_bytes += block.bytes();
111}
112
113void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_num)
114{
115 if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
116 {
117 /// Flush data in the RAM to disk. So it's easier to unite them later.
118 auto & data = *parent.many_data[thread_num];
119
120 if (data.isConvertibleToTwoLevel())
121 data.convertToTwoLevel();
122
123 if (data.size())
124 parent.aggregator.writeToTemporaryFile(data);
125 }
126}
127
128void ParallelAggregatingBlockInputStream::Handler::onFinish()
129{
130 if (!parent.isCancelled() && parent.aggregator.hasTemporaryFiles())
131 {
132 /// It may happen that some data has not yet been flushed,
133 /// because at the time of `onFinishThread` call, no data has been flushed to disk, and then some were.
134 for (auto & data : parent.many_data)
135 {
136 if (data->isConvertibleToTwoLevel())
137 data->convertToTwoLevel();
138
139 if (data->size())
140 parent.aggregator.writeToTemporaryFile(*data);
141 }
142 }
143}
144
145void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
146{
147 parent.exceptions[thread_num] = exception;
148 parent.cancel(false);
149}
150
151
152void ParallelAggregatingBlockInputStream::execute()
153{
154 many_data.resize(max_threads);
155 exceptions.resize(max_threads);
156
157 for (size_t i = 0; i < max_threads; ++i)
158 threads_data.emplace_back(keys_size, aggregates_size);
159
160 LOG_TRACE(log, "Aggregating");
161
162 Stopwatch watch;
163
164 for (auto & elem : many_data)
165 elem = std::make_shared<AggregatedDataVariants>();
166
167 processor.process();
168 processor.wait();
169
170 rethrowFirstException(exceptions);
171
172 if (isCancelledOrThrowIfKilled())
173 return;
174
175 double elapsed_seconds = watch.elapsedSeconds();
176
177 size_t total_src_rows = 0;
178 size_t total_src_bytes = 0;
179 for (size_t i = 0; i < max_threads; ++i)
180 {
181 size_t rows = many_data[i]->size();
182 LOG_TRACE(log, std::fixed << std::setprecision(3)
183 << "Aggregated. " << threads_data[i].src_rows << " to " << rows << " rows"
184 << " (from " << threads_data[i].src_bytes / 1048576.0 << " MiB)"
185 << " in " << elapsed_seconds << " sec."
186 << " (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., "
187 << threads_data[i].src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
188
189 total_src_rows += threads_data[i].src_rows;
190 total_src_bytes += threads_data[i].src_bytes;
191 }
192 LOG_TRACE(log, std::fixed << std::setprecision(3)
193 << "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
194 << " in " << elapsed_seconds << " sec."
195 << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
196
197 /// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
198 /// To do this, we pass a block with zero rows to aggregate.
199 if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
200 aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
201 threads_data[0].key_columns, threads_data[0].aggregate_columns, no_more_keys);
202}
203
204}
205