1#include <Common/ClickHouseRevision.h>
2
3#include <DataStreams/BlocksListBlockInputStream.h>
4#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
5#include <DataStreams/AggregatingBlockInputStream.h>
6#include <DataStreams/NativeBlockInputStream.h>
7
8
9namespace ProfileEvents
10{
11 extern const Event ExternalAggregationMerge;
12}
13
14namespace DB
15{
16
17Block AggregatingBlockInputStream::getHeader() const
18{
19 return aggregator.getHeader(final);
20}
21
22
23Block AggregatingBlockInputStream::readImpl()
24{
25 if (!executed)
26 {
27 executed = true;
28 AggregatedDataVariantsPtr data_variants = std::make_shared<AggregatedDataVariants>();
29
30 Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
31 aggregator.setCancellationHook(hook);
32
33 aggregator.execute(children.back(), *data_variants);
34
35 if (!aggregator.hasTemporaryFiles())
36 {
37 ManyAggregatedDataVariants many_data { data_variants };
38 impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
39 }
40 else
41 {
42 /** If there are temporary files with partially-aggregated data on the disk,
43 * then read and merge them, spending the minimum amount of memory.
44 */
45
46 ProfileEvents::increment(ProfileEvents::ExternalAggregationMerge);
47
48 if (!isCancelled())
49 {
50 /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
51 if (data_variants->size())
52 aggregator.writeToTemporaryFile(*data_variants);
53 }
54
55 const auto & files = aggregator.getTemporaryFiles();
56 BlockInputStreams input_streams;
57 for (const auto & file : files.files)
58 {
59 temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
60 input_streams.emplace_back(temporary_inputs.back()->block_in);
61 }
62
63 LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
64 << (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
65 << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
66
67 impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1);
68 }
69 }
70
71 if (isCancelledOrThrowIfKilled() || !impl)
72 return {};
73
74 return impl->read();
75}
76
77}
78