1#pragma once
2#include <Processors/IAccumulatingTransform.h>
3#include <Interpreters/Aggregator.h>
4#include <IO/ReadBufferFromFile.h>
5#include <Compression/CompressedReadBuffer.h>
6#include <Common/Stopwatch.h>
7
8namespace DB
9{
10
11class AggregatedChunkInfo : public ChunkInfo
12{
13public:
14 bool is_overflows = false;
15 Int32 bucket_num = -1;
16};
17
18class IBlockInputStream;
19using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
20
21struct AggregatingTransformParams
22{
23 Aggregator::Params params;
24 Aggregator aggregator;
25 bool final;
26
27 AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
28 : params(params_), aggregator(params), final(final_) {}
29
30 Block getHeader() const { return aggregator.getHeader(final); }
31};
32
33struct ManyAggregatedData
34{
35 ManyAggregatedDataVariants variants;
36 std::vector<std::unique_ptr<std::mutex>> mutexes;
37 std::atomic<UInt32> num_finished = 0;
38
39 explicit ManyAggregatedData(size_t num_threads = 0) : variants(num_threads), mutexes(num_threads)
40 {
41 for (auto & elem : variants)
42 elem = std::make_shared<AggregatedDataVariants>();
43
44 for (auto & mut : mutexes)
45 mut = std::make_unique<std::mutex>();
46 }
47};
48
49using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
50using ManyAggregatedDataPtr = std::shared_ptr<ManyAggregatedData>;
51
52class AggregatingTransform : public IProcessor
53{
54public:
55 AggregatingTransform(Block header, AggregatingTransformParamsPtr params_);
56
57 /// For Parallel aggregating.
58 AggregatingTransform(Block header, AggregatingTransformParamsPtr params_,
59 ManyAggregatedDataPtr many_data, size_t current_variant,
60 size_t temporary_data_merge_threads, size_t max_threads);
61 ~AggregatingTransform() override;
62
63 String getName() const override { return "AggregatingTransform"; }
64 Status prepare() override;
65 void work() override;
66 Processors expandPipeline() override;
67
68protected:
69 void consume(Chunk chunk);
70
71private:
72 /// To read the data that was flushed into the temporary data file.
73 Processors processors;
74
75 AggregatingTransformParamsPtr params;
76 Logger * log = &Logger::get("AggregatingTransform");
77
78 ColumnRawPtrs key_columns;
79 Aggregator::AggregateColumns aggregate_columns;
80 bool no_more_keys = false;
81
82 ManyAggregatedDataPtr many_data;
83 AggregatedDataVariants & variants;
84 size_t max_threads = 1;
85 size_t temporary_data_merge_threads = 1;
86
87 /// TODO: calculate time only for aggregation.
88 Stopwatch watch;
89
90 UInt64 src_rows = 0;
91 UInt64 src_bytes = 0;
92
93 bool is_generate_initialized = false;
94 bool is_consume_finished = false;
95 bool is_pipeline_created = false;
96
97 Chunk current_chunk;
98 bool read_current_chunk = false;
99
100 bool is_consume_started = false;
101
102 void initGenerate();
103};
104
105}
106