| 1 | #include <Common/ClickHouseRevision.h> |
| 2 | |
| 3 | #include <DataStreams/BlocksListBlockInputStream.h> |
| 4 | #include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h> |
| 5 | #include <DataStreams/AggregatingBlockInputStream.h> |
| 6 | #include <DataStreams/NativeBlockInputStream.h> |
| 7 | |
| 8 | |
| 9 | namespace ProfileEvents |
| 10 | { |
| 11 | extern const Event ExternalAggregationMerge; |
| 12 | } |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | |
| 17 | Block AggregatingBlockInputStream::() const |
| 18 | { |
| 19 | return aggregator.getHeader(final); |
| 20 | } |
| 21 | |
| 22 | |
| 23 | Block AggregatingBlockInputStream::readImpl() |
| 24 | { |
| 25 | if (!executed) |
| 26 | { |
| 27 | executed = true; |
| 28 | AggregatedDataVariantsPtr data_variants = std::make_shared<AggregatedDataVariants>(); |
| 29 | |
| 30 | Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); }; |
| 31 | aggregator.setCancellationHook(hook); |
| 32 | |
| 33 | aggregator.execute(children.back(), *data_variants); |
| 34 | |
| 35 | if (!aggregator.hasTemporaryFiles()) |
| 36 | { |
| 37 | ManyAggregatedDataVariants many_data { data_variants }; |
| 38 | impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1); |
| 39 | } |
| 40 | else |
| 41 | { |
| 42 | /** If there are temporary files with partially-aggregated data on the disk, |
| 43 | * then read and merge them, spending the minimum amount of memory. |
| 44 | */ |
| 45 | |
| 46 | ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge); |
| 47 | |
| 48 | if (!isCancelled()) |
| 49 | { |
| 50 | /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data. |
| 51 | if (data_variants->size()) |
| 52 | aggregator.writeToTemporaryFile(*data_variants); |
| 53 | } |
| 54 | |
| 55 | const auto & files = aggregator.getTemporaryFiles(); |
| 56 | BlockInputStreams input_streams; |
| 57 | for (const auto & file : files.files) |
| 58 | { |
| 59 | temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path())); |
| 60 | input_streams.emplace_back(temporary_inputs.back()->block_in); |
| 61 | } |
| 62 | |
| 63 | LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " |
| 64 | << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " |
| 65 | << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed." ); |
| 66 | |
| 67 | impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1); |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | if (isCancelledOrThrowIfKilled() || !impl) |
| 72 | return {}; |
| 73 | |
| 74 | return impl->read(); |
| 75 | } |
| 76 | |
| 77 | } |
| 78 | |