1#include <Processors/Transforms/AggregatingTransform.h>
2
3#include <Common/ClickHouseRevision.h>
4#include <DataStreams/NativeBlockInputStream.h>
5#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
6#include <Processors/ISource.h>
7#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
8
9namespace ProfileEvents
10{
11 extern const Event ExternalAggregationMerge;
12}
13
14namespace DB
15{
16
17/// Convert block to chunk.
18/// Adds additional info about aggregation.
19static Chunk convertToChunk(const Block & block)
20{
21 auto info = std::make_shared<AggregatedChunkInfo>();
22 info->bucket_num = block.info.bucket_num;
23 info->is_overflows = block.info.is_overflows;
24
25 UInt64 num_rows = block.rows();
26 Chunk chunk(block.getColumns(), num_rows);
27 chunk.setChunkInfo(std::move(info));
28
29 return chunk;
30}
31
32namespace
33{
34 /// Reads chunks from file in native format. Provide chunks with aggregation info.
35 class SourceFromNativeStream : public ISource
36 {
37 public:
38 SourceFromNativeStream(const Block & header, const std::string & path)
39 : ISource(header), file_in(path), compressed_in(file_in),
40 block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
41 {
42 block_in->readPrefix();
43 }
44
45 String getName() const override { return "SourceFromNativeStream"; }
46
47 Chunk generate() override
48 {
49 if (!block_in)
50 return {};
51
52 auto block = block_in->read();
53 if (!block)
54 {
55 block_in->readSuffix();
56 block_in.reset();
57 return {};
58 }
59
60 return convertToChunk(block);
61 }
62
63 private:
64 ReadBufferFromFile file_in;
65 CompressedReadBuffer compressed_in;
66 BlockInputStreamPtr block_in;
67 };
68}
69
70/// Worker which merges buckets for two-level aggregation.
71/// Atomically increments bucket counter and returns merged result.
72class ConvertingAggregatedToChunksSource : public ISource
73{
74public:
75 ConvertingAggregatedToChunksSource(
76 AggregatingTransformParamsPtr params_,
77 ManyAggregatedDataVariantsPtr data_,
78 Arena * arena_,
79 std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge_)
80 : ISource(params_->getHeader())
81 , params(std::move(params_))
82 , data(std::move(data_))
83 , next_bucket_to_merge(std::move(next_bucket_to_merge_))
84 , arena(arena_)
85 {}
86
87 String getName() const override { return "ConvertingAggregatedToChunksSource"; }
88
89protected:
90 Chunk generate() override
91 {
92 UInt32 bucket_num = next_bucket_to_merge->fetch_add(1);
93
94 if (bucket_num >= NUM_BUCKETS)
95 return {};
96
97 Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num);
98
99 return convertToChunk(block);
100 }
101
102private:
103 AggregatingTransformParamsPtr params;
104 ManyAggregatedDataVariantsPtr data;
105 std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge;
106 Arena * arena;
107
108 static constexpr UInt32 NUM_BUCKETS = 256;
109};
110
111/// Generates chunks with aggregated data.
112/// In single level case, aggregates data itself.
113/// In two-level case, creates `ConvertingAggregatedToChunksSource` workers:
114///
115/// ConvertingAggregatedToChunksSource ->
116/// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
117/// ConvertingAggregatedToChunksSource ->
118///
119/// Result chunks guaranteed to be sorted by bucket number.
120class ConvertingAggregatedToChunksTransform : public IProcessor
121{
122public:
123 ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_)
124 : IProcessor({}, {params_->getHeader()})
125 , params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {}
126
127 String getName() const override { return "ConvertingAggregatedToChunksTransform"; }
128
129 void work() override
130 {
131 if (data->empty())
132 {
133 finished = true;
134 return;
135 }
136
137 if (!is_initialized)
138 {
139 initialize();
140 return;
141 }
142
143 if (data->at(0)->isTwoLevel())
144 {
145 /// In two-level case will only create sources.
146 if (inputs.empty())
147 createSources();
148 }
149 else
150 {
151 mergeSingleLevel();
152 }
153 }
154
155 Processors expandPipeline() override
156 {
157 for (auto & source : processors)
158 {
159 auto & out = source->getOutputs().front();
160 inputs.emplace_back(out.getHeader(), this);
161 connect(out, inputs.back());
162 }
163
164 return std::move(processors);
165 }
166
167 IProcessor::Status prepare() override
168 {
169 auto & output = outputs.front();
170
171 if (finished && !has_input)
172 {
173 output.finish();
174 return Status::Finished;
175 }
176
177 /// Check can output.
178 if (output.isFinished())
179 {
180 for (auto & input : inputs)
181 input.close();
182
183 return Status::Finished;
184 }
185
186 if (!output.canPush())
187 return Status::PortFull;
188
189 if (!is_initialized)
190 return Status::Ready;
191
192 if (!processors.empty())
193 return Status::ExpandPipeline;
194
195 if (has_input)
196 return preparePushToOutput();
197
198 /// Single level case.
199 if (inputs.empty())
200 return Status::Ready;
201
202 /// Two-level case.
203 return preparePullFromInputs();
204 }
205
206private:
207 IProcessor::Status preparePushToOutput()
208 {
209 auto & output = outputs.front();
210 output.push(std::move(current_chunk));
211 has_input = false;
212
213 if (finished)
214 {
215 output.finish();
216 return Status::Finished;
217 }
218
219 return Status::PortFull;
220 }
221
222 /// Read all sources and try to push current bucket.
223 IProcessor::Status preparePullFromInputs()
224 {
225 bool all_inputs_are_finished = true;
226
227 for (auto & input : inputs)
228 {
229 if (input.isFinished())
230 continue;
231
232 all_inputs_are_finished = false;
233
234 input.setNeeded();
235
236 if (input.hasData())
237 ready_chunks.emplace_back(input.pull());
238 }
239
240 moveReadyChunksToMap();
241
242 if (trySetCurrentChunkFromCurrentBucket())
243 return preparePushToOutput();
244
245 if (all_inputs_are_finished)
246 throw Exception("All sources have finished before getting enough data in "
247 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
248
249 return Status::NeedData;
250 }
251
252private:
253 AggregatingTransformParamsPtr params;
254 ManyAggregatedDataVariantsPtr data;
255 size_t num_threads;
256
257 bool is_initialized = false;
258 bool has_input = false;
259 bool finished = false;
260
261 Chunk current_chunk;
262 Chunks ready_chunks;
263
264 UInt32 current_bucket_num = 0;
265 static constexpr Int32 NUM_BUCKETS = 256;
266 std::map<UInt32, Chunk> bucket_to_chunk;
267
268 Processors processors;
269
270 static Int32 getBucketFromChunk(const Chunk & chunk)
271 {
272 auto & info = chunk.getChunkInfo();
273 if (!info)
274 throw Exception("Chunk info was not set for chunk in "
275 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
276
277 auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
278 if (!agg_info)
279 throw Exception("Chunk should have AggregatedChunkInfo in "
280 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
281
282 return agg_info->bucket_num;
283 }
284
285 void moveReadyChunksToMap()
286 {
287 for (auto & chunk : ready_chunks)
288 {
289 auto bucket = getBucketFromChunk(chunk);
290
291 if (bucket < 0 || bucket >= NUM_BUCKETS)
292 throw Exception("Invalid bucket number " + toString(bucket) + " in "
293 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
294
295 if (bucket_to_chunk.count(bucket))
296 throw Exception("Found several chunks with the same bucket number in "
297 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
298
299 bucket_to_chunk[bucket] = std::move(chunk);
300 }
301
302 ready_chunks.clear();
303 }
304
305 void setCurrentChunk(Chunk chunk)
306 {
307 if (has_input)
308 throw Exception("Current chunk was already set in "
309 "ConvertingAggregatedToChunksTransform.", ErrorCodes::LOGICAL_ERROR);
310
311 has_input = true;
312 current_chunk = std::move(chunk);
313 }
314
315 void initialize()
316 {
317 is_initialized = true;
318
319 AggregatedDataVariantsPtr & first = data->at(0);
320
321 /// At least we need one arena in first data item per thread
322 if (num_threads > first->aggregates_pools.size())
323 {
324 Arenas & first_pool = first->aggregates_pools;
325 for (size_t j = first_pool.size(); j < num_threads; j++)
326 first_pool.emplace_back(std::make_shared<Arena>());
327 }
328
329 if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row)
330 {
331 params->aggregator.mergeWithoutKeyDataImpl(*data);
332 auto block = params->aggregator.prepareBlockAndFillWithoutKey(
333 *first, params->final, first->type != AggregatedDataVariants::Type::without_key);
334
335 setCurrentChunk(convertToChunk(block));
336 }
337 }
338
339 void mergeSingleLevel()
340 {
341 AggregatedDataVariantsPtr & first = data->at(0);
342
343 if (current_bucket_num > 0 || first->type == AggregatedDataVariants::Type::without_key)
344 {
345 finished = true;
346 return;
347 }
348
349 ++current_bucket_num;
350
351 #define M(NAME) \
352 else if (first->type == AggregatedDataVariants::Type::NAME) \
353 params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data);
354 if (false) {}
355 APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
356 #undef M
357 else
358 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
359
360 auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final);
361
362 setCurrentChunk(convertToChunk(block));
363 finished = true;
364 }
365
366 void createSources()
367 {
368 AggregatedDataVariantsPtr & first = data->at(0);
369 auto next_bucket_to_merge = std::make_shared<std::atomic<UInt32>>(0);
370
371 for (size_t thread = 0; thread < num_threads; ++thread)
372 {
373 Arena * arena = first->aggregates_pools.at(thread).get();
374 auto source = std::make_shared<ConvertingAggregatedToChunksSource>(
375 params, data, arena, next_bucket_to_merge);
376
377 processors.emplace_back(std::move(source));
378 }
379 }
380
381 bool trySetCurrentChunkFromCurrentBucket()
382 {
383 auto it = bucket_to_chunk.find(current_bucket_num);
384 if (it != bucket_to_chunk.end())
385 {
386 setCurrentChunk(std::move(it->second));
387 ++current_bucket_num;
388
389 if (current_bucket_num == NUM_BUCKETS)
390 finished = true;
391
392 return true;
393 }
394
395 return false;
396 }
397};
398
399AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_)
400 : AggregatingTransform(std::move(header), std::move(params_)
401 , std::make_unique<ManyAggregatedData>(1), 0, 1, 1)
402{
403}
404
405AggregatingTransform::AggregatingTransform(
406 Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_,
407 size_t current_variant, size_t temporary_data_merge_threads_, size_t max_threads_)
408 : IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_))
409 , key_columns(params->params.keys_size)
410 , aggregate_columns(params->params.aggregates_size)
411 , many_data(std::move(many_data_))
412 , variants(*many_data->variants[current_variant])
413 , max_threads(std::min(many_data->variants.size(), max_threads_))
414 , temporary_data_merge_threads(temporary_data_merge_threads_)
415{
416}
417
418AggregatingTransform::~AggregatingTransform() = default;
419
420IProcessor::Status AggregatingTransform::prepare()
421{
422 auto & output = outputs.front();
423 /// Last output is current. All other outputs should already be closed.
424 auto & input = inputs.back();
425
426 /// Check can output.
427 if (output.isFinished())
428 {
429 input.close();
430 return Status::Finished;
431 }
432
433 if (!output.canPush())
434 {
435 input.setNotNeeded();
436 return Status::PortFull;
437 }
438
439 /// Finish data processing, prepare to generating.
440 if (is_consume_finished && !is_generate_initialized)
441 return Status::Ready;
442
443 if (is_generate_initialized && !is_pipeline_created && !processors.empty())
444 return Status::ExpandPipeline;
445
446 /// Only possible while consuming.
447 if (read_current_chunk)
448 return Status::Ready;
449
450 /// Get chunk from input.
451 if (input.isFinished())
452 {
453 if (is_consume_finished)
454 {
455 output.finish();
456 return Status::Finished;
457 }
458 else
459 {
460 /// Finish data processing and create another pipe.
461 is_consume_finished = true;
462 return Status::Ready;
463 }
464 }
465
466 input.setNeeded();
467 if (!input.hasData())
468 return Status::NeedData;
469
470 current_chunk = input.pull();
471 read_current_chunk = true;
472
473 if (is_consume_finished)
474 {
475 output.push(std::move(current_chunk));
476 read_current_chunk = false;
477 return Status::PortFull;
478 }
479
480 return Status::Ready;
481}
482
483void AggregatingTransform::work()
484{
485 if (is_consume_finished)
486 initGenerate();
487 else
488 {
489 consume(std::move(current_chunk));
490 read_current_chunk = false;
491 }
492}
493
494Processors AggregatingTransform::expandPipeline()
495{
496 auto & out = processors.back()->getOutputs().front();
497 inputs.emplace_back(out.getHeader(), this);
498 connect(out, inputs.back());
499 is_pipeline_created = true;
500 return std::move(processors);
501}
502
503void AggregatingTransform::consume(Chunk chunk)
504{
505 UInt64 num_rows = chunk.getNumRows();
506
507 if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set)
508 return;
509
510 if (!is_consume_started)
511 {
512 LOG_TRACE(log, "Aggregating");
513 is_consume_started = true;
514 }
515
516 src_rows += chunk.getNumRows();
517 src_bytes += chunk.bytes();
518
519 if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys))
520 is_consume_finished = true;
521}
522
523void AggregatingTransform::initGenerate()
524{
525 if (is_generate_initialized)
526 return;
527
528 is_generate_initialized = true;
529
530 /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
531 /// To do this, we pass a block with zero rows to aggregate.
532 if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
533 params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys);
534
535 double elapsed_seconds = watch.elapsedSeconds();
536 size_t rows = variants.sizeWithoutOverflowRow();
537 LOG_TRACE(log, std::fixed << std::setprecision(3)
538 << "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
539 << " in " << elapsed_seconds << " sec."
540 << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
541
542 if (params->aggregator.hasTemporaryFiles())
543 {
544 if (variants.isConvertibleToTwoLevel())
545 variants.convertToTwoLevel();
546
547 /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
548 if (variants.size())
549 params->aggregator.writeToTemporaryFile(variants);
550 }
551
552 if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size())
553 return;
554
555 if (!params->aggregator.hasTemporaryFiles())
556 {
557 auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
558 auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
559 processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
560 }
561 else
562 {
563 /// If there are temporary files with partially-aggregated data on the disk,
564 /// then read and merge them, spending the minimum amount of memory.
565
566 ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
567
568 if (many_data->variants.size() > 1)
569 {
570 /// It may happen that some data has not yet been flushed,
571 /// because at the time thread has finished, no data has been flushed to disk, and then some were.
572 for (auto & cur_variants : many_data->variants)
573 {
574 if (cur_variants->isConvertibleToTwoLevel())
575 cur_variants->convertToTwoLevel();
576
577 if (cur_variants->size())
578 params->aggregator.writeToTemporaryFile(*cur_variants);
579 }
580 }
581
582 auto header = params->aggregator.getHeader(false);
583
584 const auto & files = params->aggregator.getTemporaryFiles();
585 BlockInputStreams input_streams;
586 for (const auto & file : files.files)
587 processors.emplace_back(std::make_unique<SourceFromNativeStream>(header, file->path()));
588
589 LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
590 << (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
591 << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
592
593 auto pipe = createMergingAggregatedMemoryEfficientPipe(
594 header, params, files.files.size(), temporary_data_merge_threads);
595
596 auto input = pipe.front()->getInputs().begin();
597 for (auto & processor : processors)
598 connect(processor->getOutputs().front(), *(input++));
599
600 processors.insert(processors.end(), pipe.begin(), pipe.end());
601 }
602}
603
604}
605