1 | #include <Common/ClickHouseRevision.h> |
2 | |
3 | #include <DataStreams/BlocksListBlockInputStream.h> |
4 | #include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h> |
5 | #include <DataStreams/AggregatingBlockInputStream.h> |
6 | #include <DataStreams/NativeBlockInputStream.h> |
7 | |
8 | |
9 | namespace ProfileEvents |
10 | { |
11 | extern const Event ExternalAggregationMerge; |
12 | } |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | Block AggregatingBlockInputStream::() const |
18 | { |
19 | return aggregator.getHeader(final); |
20 | } |
21 | |
22 | |
23 | Block AggregatingBlockInputStream::readImpl() |
24 | { |
25 | if (!executed) |
26 | { |
27 | executed = true; |
28 | AggregatedDataVariantsPtr data_variants = std::make_shared<AggregatedDataVariants>(); |
29 | |
30 | Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); }; |
31 | aggregator.setCancellationHook(hook); |
32 | |
33 | aggregator.execute(children.back(), *data_variants); |
34 | |
35 | if (!aggregator.hasTemporaryFiles()) |
36 | { |
37 | ManyAggregatedDataVariants many_data { data_variants }; |
38 | impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1); |
39 | } |
40 | else |
41 | { |
42 | /** If there are temporary files with partially-aggregated data on the disk, |
43 | * then read and merge them, spending the minimum amount of memory. |
44 | */ |
45 | |
46 | ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge); |
47 | |
48 | if (!isCancelled()) |
49 | { |
50 | /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data. |
51 | if (data_variants->size()) |
52 | aggregator.writeToTemporaryFile(*data_variants); |
53 | } |
54 | |
55 | const auto & files = aggregator.getTemporaryFiles(); |
56 | BlockInputStreams input_streams; |
57 | for (const auto & file : files.files) |
58 | { |
59 | temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path())); |
60 | input_streams.emplace_back(temporary_inputs.back()->block_in); |
61 | } |
62 | |
63 | LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " |
64 | << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " |
65 | << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed." ); |
66 | |
67 | impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1); |
68 | } |
69 | } |
70 | |
71 | if (isCancelledOrThrowIfKilled() || !impl) |
72 | return {}; |
73 | |
74 | return impl->read(); |
75 | } |
76 | |
77 | } |
78 | |