1 | #include <Processors/Transforms/AggregatingTransform.h> |
2 | |
3 | #include <Common/ClickHouseRevision.h> |
4 | #include <DataStreams/NativeBlockInputStream.h> |
5 | #include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h> |
6 | #include <Processors/ISource.h> |
7 | #include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h> |
8 | |
9 | namespace ProfileEvents |
10 | { |
11 | extern const Event ExternalAggregationMerge; |
12 | } |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | /// Convert block to chunk. |
18 | /// Adds additional info about aggregation. |
19 | static Chunk convertToChunk(const Block & block) |
20 | { |
21 | auto info = std::make_shared<AggregatedChunkInfo>(); |
22 | info->bucket_num = block.info.bucket_num; |
23 | info->is_overflows = block.info.is_overflows; |
24 | |
25 | UInt64 num_rows = block.rows(); |
26 | Chunk chunk(block.getColumns(), num_rows); |
27 | chunk.setChunkInfo(std::move(info)); |
28 | |
29 | return chunk; |
30 | } |
31 | |
32 | namespace |
33 | { |
34 | /// Reads chunks from file in native format. Provide chunks with aggregation info. |
35 | class SourceFromNativeStream : public ISource |
36 | { |
37 | public: |
38 | SourceFromNativeStream(const Block & , const std::string & path) |
39 | : ISource(header), file_in(path), compressed_in(file_in), |
40 | block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) |
41 | { |
42 | block_in->readPrefix(); |
43 | } |
44 | |
45 | String getName() const override { return "SourceFromNativeStream" ; } |
46 | |
47 | Chunk generate() override |
48 | { |
49 | if (!block_in) |
50 | return {}; |
51 | |
52 | auto block = block_in->read(); |
53 | if (!block) |
54 | { |
55 | block_in->readSuffix(); |
56 | block_in.reset(); |
57 | return {}; |
58 | } |
59 | |
60 | return convertToChunk(block); |
61 | } |
62 | |
63 | private: |
64 | ReadBufferFromFile file_in; |
65 | CompressedReadBuffer compressed_in; |
66 | BlockInputStreamPtr block_in; |
67 | }; |
68 | } |
69 | |
70 | /// Worker which merges buckets for two-level aggregation. |
71 | /// Atomically increments bucket counter and returns merged result. |
72 | class ConvertingAggregatedToChunksSource : public ISource |
73 | { |
74 | public: |
75 | ConvertingAggregatedToChunksSource( |
76 | AggregatingTransformParamsPtr params_, |
77 | ManyAggregatedDataVariantsPtr data_, |
78 | Arena * arena_, |
79 | std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge_) |
80 | : ISource(params_->getHeader()) |
81 | , params(std::move(params_)) |
82 | , data(std::move(data_)) |
83 | , next_bucket_to_merge(std::move(next_bucket_to_merge_)) |
84 | , arena(arena_) |
85 | {} |
86 | |
87 | String getName() const override { return "ConvertingAggregatedToChunksSource" ; } |
88 | |
89 | protected: |
90 | Chunk generate() override |
91 | { |
92 | UInt32 bucket_num = next_bucket_to_merge->fetch_add(1); |
93 | |
94 | if (bucket_num >= NUM_BUCKETS) |
95 | return {}; |
96 | |
97 | Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num); |
98 | |
99 | return convertToChunk(block); |
100 | } |
101 | |
102 | private: |
103 | AggregatingTransformParamsPtr params; |
104 | ManyAggregatedDataVariantsPtr data; |
105 | std::shared_ptr<std::atomic<UInt32>> next_bucket_to_merge; |
106 | Arena * arena; |
107 | |
108 | static constexpr UInt32 NUM_BUCKETS = 256; |
109 | }; |
110 | |
111 | /// Generates chunks with aggregated data. |
112 | /// In single level case, aggregates data itself. |
113 | /// In two-level case, creates `ConvertingAggregatedToChunksSource` workers: |
114 | /// |
115 | /// ConvertingAggregatedToChunksSource -> |
116 | /// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform |
117 | /// ConvertingAggregatedToChunksSource -> |
118 | /// |
119 | /// Result chunks guaranteed to be sorted by bucket number. |
120 | class ConvertingAggregatedToChunksTransform : public IProcessor |
121 | { |
122 | public: |
123 | ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_) |
124 | : IProcessor({}, {params_->getHeader()}) |
125 | , params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {} |
126 | |
127 | String getName() const override { return "ConvertingAggregatedToChunksTransform" ; } |
128 | |
129 | void work() override |
130 | { |
131 | if (data->empty()) |
132 | { |
133 | finished = true; |
134 | return; |
135 | } |
136 | |
137 | if (!is_initialized) |
138 | { |
139 | initialize(); |
140 | return; |
141 | } |
142 | |
143 | if (data->at(0)->isTwoLevel()) |
144 | { |
145 | /// In two-level case will only create sources. |
146 | if (inputs.empty()) |
147 | createSources(); |
148 | } |
149 | else |
150 | { |
151 | mergeSingleLevel(); |
152 | } |
153 | } |
154 | |
155 | Processors expandPipeline() override |
156 | { |
157 | for (auto & source : processors) |
158 | { |
159 | auto & out = source->getOutputs().front(); |
160 | inputs.emplace_back(out.getHeader(), this); |
161 | connect(out, inputs.back()); |
162 | } |
163 | |
164 | return std::move(processors); |
165 | } |
166 | |
167 | IProcessor::Status prepare() override |
168 | { |
169 | auto & output = outputs.front(); |
170 | |
171 | if (finished && !has_input) |
172 | { |
173 | output.finish(); |
174 | return Status::Finished; |
175 | } |
176 | |
177 | /// Check can output. |
178 | if (output.isFinished()) |
179 | { |
180 | for (auto & input : inputs) |
181 | input.close(); |
182 | |
183 | return Status::Finished; |
184 | } |
185 | |
186 | if (!output.canPush()) |
187 | return Status::PortFull; |
188 | |
189 | if (!is_initialized) |
190 | return Status::Ready; |
191 | |
192 | if (!processors.empty()) |
193 | return Status::ExpandPipeline; |
194 | |
195 | if (has_input) |
196 | return preparePushToOutput(); |
197 | |
198 | /// Single level case. |
199 | if (inputs.empty()) |
200 | return Status::Ready; |
201 | |
202 | /// Two-level case. |
203 | return preparePullFromInputs(); |
204 | } |
205 | |
206 | private: |
207 | IProcessor::Status preparePushToOutput() |
208 | { |
209 | auto & output = outputs.front(); |
210 | output.push(std::move(current_chunk)); |
211 | has_input = false; |
212 | |
213 | if (finished) |
214 | { |
215 | output.finish(); |
216 | return Status::Finished; |
217 | } |
218 | |
219 | return Status::PortFull; |
220 | } |
221 | |
222 | /// Read all sources and try to push current bucket. |
223 | IProcessor::Status preparePullFromInputs() |
224 | { |
225 | bool all_inputs_are_finished = true; |
226 | |
227 | for (auto & input : inputs) |
228 | { |
229 | if (input.isFinished()) |
230 | continue; |
231 | |
232 | all_inputs_are_finished = false; |
233 | |
234 | input.setNeeded(); |
235 | |
236 | if (input.hasData()) |
237 | ready_chunks.emplace_back(input.pull()); |
238 | } |
239 | |
240 | moveReadyChunksToMap(); |
241 | |
242 | if (trySetCurrentChunkFromCurrentBucket()) |
243 | return preparePushToOutput(); |
244 | |
245 | if (all_inputs_are_finished) |
246 | throw Exception("All sources have finished before getting enough data in " |
247 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
248 | |
249 | return Status::NeedData; |
250 | } |
251 | |
252 | private: |
253 | AggregatingTransformParamsPtr params; |
254 | ManyAggregatedDataVariantsPtr data; |
255 | size_t num_threads; |
256 | |
257 | bool is_initialized = false; |
258 | bool has_input = false; |
259 | bool finished = false; |
260 | |
261 | Chunk current_chunk; |
262 | Chunks ready_chunks; |
263 | |
264 | UInt32 current_bucket_num = 0; |
265 | static constexpr Int32 NUM_BUCKETS = 256; |
266 | std::map<UInt32, Chunk> bucket_to_chunk; |
267 | |
268 | Processors processors; |
269 | |
270 | static Int32 getBucketFromChunk(const Chunk & chunk) |
271 | { |
272 | auto & info = chunk.getChunkInfo(); |
273 | if (!info) |
274 | throw Exception("Chunk info was not set for chunk in " |
275 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
276 | |
277 | auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()); |
278 | if (!agg_info) |
279 | throw Exception("Chunk should have AggregatedChunkInfo in " |
280 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
281 | |
282 | return agg_info->bucket_num; |
283 | } |
284 | |
285 | void moveReadyChunksToMap() |
286 | { |
287 | for (auto & chunk : ready_chunks) |
288 | { |
289 | auto bucket = getBucketFromChunk(chunk); |
290 | |
291 | if (bucket < 0 || bucket >= NUM_BUCKETS) |
292 | throw Exception("Invalid bucket number " + toString(bucket) + " in " |
293 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
294 | |
295 | if (bucket_to_chunk.count(bucket)) |
296 | throw Exception("Found several chunks with the same bucket number in " |
297 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
298 | |
299 | bucket_to_chunk[bucket] = std::move(chunk); |
300 | } |
301 | |
302 | ready_chunks.clear(); |
303 | } |
304 | |
305 | void setCurrentChunk(Chunk chunk) |
306 | { |
307 | if (has_input) |
308 | throw Exception("Current chunk was already set in " |
309 | "ConvertingAggregatedToChunksTransform." , ErrorCodes::LOGICAL_ERROR); |
310 | |
311 | has_input = true; |
312 | current_chunk = std::move(chunk); |
313 | } |
314 | |
315 | void initialize() |
316 | { |
317 | is_initialized = true; |
318 | |
319 | AggregatedDataVariantsPtr & first = data->at(0); |
320 | |
321 | /// At least we need one arena in first data item per thread |
322 | if (num_threads > first->aggregates_pools.size()) |
323 | { |
324 | Arenas & first_pool = first->aggregates_pools; |
325 | for (size_t j = first_pool.size(); j < num_threads; j++) |
326 | first_pool.emplace_back(std::make_shared<Arena>()); |
327 | } |
328 | |
329 | if (first->type == AggregatedDataVariants::Type::without_key || params->params.overflow_row) |
330 | { |
331 | params->aggregator.mergeWithoutKeyDataImpl(*data); |
332 | auto block = params->aggregator.prepareBlockAndFillWithoutKey( |
333 | *first, params->final, first->type != AggregatedDataVariants::Type::without_key); |
334 | |
335 | setCurrentChunk(convertToChunk(block)); |
336 | } |
337 | } |
338 | |
339 | void mergeSingleLevel() |
340 | { |
341 | AggregatedDataVariantsPtr & first = data->at(0); |
342 | |
343 | if (current_bucket_num > 0 || first->type == AggregatedDataVariants::Type::without_key) |
344 | { |
345 | finished = true; |
346 | return; |
347 | } |
348 | |
349 | ++current_bucket_num; |
350 | |
351 | #define M(NAME) \ |
352 | else if (first->type == AggregatedDataVariants::Type::NAME) \ |
353 | params->aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(*data); |
354 | if (false) {} |
355 | APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) |
356 | #undef M |
357 | else |
358 | throw Exception("Unknown aggregated data variant." , ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); |
359 | |
360 | auto block = params->aggregator.prepareBlockAndFillSingleLevel(*first, params->final); |
361 | |
362 | setCurrentChunk(convertToChunk(block)); |
363 | finished = true; |
364 | } |
365 | |
366 | void createSources() |
367 | { |
368 | AggregatedDataVariantsPtr & first = data->at(0); |
369 | auto next_bucket_to_merge = std::make_shared<std::atomic<UInt32>>(0); |
370 | |
371 | for (size_t thread = 0; thread < num_threads; ++thread) |
372 | { |
373 | Arena * arena = first->aggregates_pools.at(thread).get(); |
374 | auto source = std::make_shared<ConvertingAggregatedToChunksSource>( |
375 | params, data, arena, next_bucket_to_merge); |
376 | |
377 | processors.emplace_back(std::move(source)); |
378 | } |
379 | } |
380 | |
381 | bool trySetCurrentChunkFromCurrentBucket() |
382 | { |
383 | auto it = bucket_to_chunk.find(current_bucket_num); |
384 | if (it != bucket_to_chunk.end()) |
385 | { |
386 | setCurrentChunk(std::move(it->second)); |
387 | ++current_bucket_num; |
388 | |
389 | if (current_bucket_num == NUM_BUCKETS) |
390 | finished = true; |
391 | |
392 | return true; |
393 | } |
394 | |
395 | return false; |
396 | } |
397 | }; |
398 | |
399 | AggregatingTransform::AggregatingTransform(Block , AggregatingTransformParamsPtr params_) |
400 | : AggregatingTransform(std::move(header), std::move(params_) |
401 | , std::make_unique<ManyAggregatedData>(1), 0, 1, 1) |
402 | { |
403 | } |
404 | |
405 | AggregatingTransform::AggregatingTransform( |
406 | Block , AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_, |
407 | size_t current_variant, size_t temporary_data_merge_threads_, size_t max_threads_) |
408 | : IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_)) |
409 | , key_columns(params->params.keys_size) |
410 | , aggregate_columns(params->params.aggregates_size) |
411 | , many_data(std::move(many_data_)) |
412 | , variants(*many_data->variants[current_variant]) |
413 | , max_threads(std::min(many_data->variants.size(), max_threads_)) |
414 | , temporary_data_merge_threads(temporary_data_merge_threads_) |
415 | { |
416 | } |
417 | |
418 | AggregatingTransform::~AggregatingTransform() = default; |
419 | |
420 | IProcessor::Status AggregatingTransform::prepare() |
421 | { |
422 | auto & output = outputs.front(); |
423 | /// Last output is current. All other outputs should already be closed. |
424 | auto & input = inputs.back(); |
425 | |
426 | /// Check can output. |
427 | if (output.isFinished()) |
428 | { |
429 | input.close(); |
430 | return Status::Finished; |
431 | } |
432 | |
433 | if (!output.canPush()) |
434 | { |
435 | input.setNotNeeded(); |
436 | return Status::PortFull; |
437 | } |
438 | |
439 | /// Finish data processing, prepare to generating. |
440 | if (is_consume_finished && !is_generate_initialized) |
441 | return Status::Ready; |
442 | |
443 | if (is_generate_initialized && !is_pipeline_created && !processors.empty()) |
444 | return Status::ExpandPipeline; |
445 | |
446 | /// Only possible while consuming. |
447 | if (read_current_chunk) |
448 | return Status::Ready; |
449 | |
450 | /// Get chunk from input. |
451 | if (input.isFinished()) |
452 | { |
453 | if (is_consume_finished) |
454 | { |
455 | output.finish(); |
456 | return Status::Finished; |
457 | } |
458 | else |
459 | { |
460 | /// Finish data processing and create another pipe. |
461 | is_consume_finished = true; |
462 | return Status::Ready; |
463 | } |
464 | } |
465 | |
466 | input.setNeeded(); |
467 | if (!input.hasData()) |
468 | return Status::NeedData; |
469 | |
470 | current_chunk = input.pull(); |
471 | read_current_chunk = true; |
472 | |
473 | if (is_consume_finished) |
474 | { |
475 | output.push(std::move(current_chunk)); |
476 | read_current_chunk = false; |
477 | return Status::PortFull; |
478 | } |
479 | |
480 | return Status::Ready; |
481 | } |
482 | |
483 | void AggregatingTransform::work() |
484 | { |
485 | if (is_consume_finished) |
486 | initGenerate(); |
487 | else |
488 | { |
489 | consume(std::move(current_chunk)); |
490 | read_current_chunk = false; |
491 | } |
492 | } |
493 | |
494 | Processors AggregatingTransform::expandPipeline() |
495 | { |
496 | auto & out = processors.back()->getOutputs().front(); |
497 | inputs.emplace_back(out.getHeader(), this); |
498 | connect(out, inputs.back()); |
499 | is_pipeline_created = true; |
500 | return std::move(processors); |
501 | } |
502 | |
503 | void AggregatingTransform::consume(Chunk chunk) |
504 | { |
505 | UInt64 num_rows = chunk.getNumRows(); |
506 | |
507 | if (num_rows == 0 && params->params.empty_result_for_aggregation_by_empty_set) |
508 | return; |
509 | |
510 | if (!is_consume_started) |
511 | { |
512 | LOG_TRACE(log, "Aggregating" ); |
513 | is_consume_started = true; |
514 | } |
515 | |
516 | src_rows += chunk.getNumRows(); |
517 | src_bytes += chunk.bytes(); |
518 | |
519 | if (!params->aggregator.executeOnBlock(chunk.detachColumns(), num_rows, variants, key_columns, aggregate_columns, no_more_keys)) |
520 | is_consume_finished = true; |
521 | } |
522 | |
523 | void AggregatingTransform::initGenerate() |
524 | { |
525 | if (is_generate_initialized) |
526 | return; |
527 | |
528 | is_generate_initialized = true; |
529 | |
530 | /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. |
531 | /// To do this, we pass a block with zero rows to aggregate. |
532 | if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) |
533 | params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, no_more_keys); |
534 | |
535 | double elapsed_seconds = watch.elapsedSeconds(); |
536 | size_t rows = variants.sizeWithoutOverflowRow(); |
537 | LOG_TRACE(log, std::fixed << std::setprecision(3) |
538 | << "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)" |
539 | << " in " << elapsed_seconds << " sec." |
540 | << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)" ); |
541 | |
542 | if (params->aggregator.hasTemporaryFiles()) |
543 | { |
544 | if (variants.isConvertibleToTwoLevel()) |
545 | variants.convertToTwoLevel(); |
546 | |
547 | /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data. |
548 | if (variants.size()) |
549 | params->aggregator.writeToTemporaryFile(variants); |
550 | } |
551 | |
552 | if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size()) |
553 | return; |
554 | |
555 | if (!params->aggregator.hasTemporaryFiles()) |
556 | { |
557 | auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants); |
558 | auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data)); |
559 | processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads)); |
560 | } |
561 | else |
562 | { |
563 | /// If there are temporary files with partially-aggregated data on the disk, |
564 | /// then read and merge them, spending the minimum amount of memory. |
565 | |
566 | ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge); |
567 | |
568 | if (many_data->variants.size() > 1) |
569 | { |
570 | /// It may happen that some data has not yet been flushed, |
571 | /// because at the time thread has finished, no data has been flushed to disk, and then some were. |
572 | for (auto & cur_variants : many_data->variants) |
573 | { |
574 | if (cur_variants->isConvertibleToTwoLevel()) |
575 | cur_variants->convertToTwoLevel(); |
576 | |
577 | if (cur_variants->size()) |
578 | params->aggregator.writeToTemporaryFile(*cur_variants); |
579 | } |
580 | } |
581 | |
582 | auto = params->aggregator.getHeader(false); |
583 | |
584 | const auto & files = params->aggregator.getTemporaryFiles(); |
585 | BlockInputStreams input_streams; |
586 | for (const auto & file : files.files) |
587 | processors.emplace_back(std::make_unique<SourceFromNativeStream>(header, file->path())); |
588 | |
589 | LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " |
590 | << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " |
591 | << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed." ); |
592 | |
593 | auto pipe = createMergingAggregatedMemoryEfficientPipe( |
594 | header, params, files.files.size(), temporary_data_merge_threads); |
595 | |
596 | auto input = pipe.front()->getInputs().begin(); |
597 | for (auto & processor : processors) |
598 | connect(processor->getOutputs().front(), *(input++)); |
599 | |
600 | processors.insert(processors.end(), pipe.begin(), pipe.end()); |
601 | } |
602 | } |
603 | |
604 | } |
605 | |