1 | #pragma once |
2 | #include <Processors/IAccumulatingTransform.h> |
3 | #include <Interpreters/Aggregator.h> |
4 | #include <IO/ReadBufferFromFile.h> |
5 | #include <Compression/CompressedReadBuffer.h> |
6 | #include <Common/Stopwatch.h> |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | class AggregatedChunkInfo : public ChunkInfo |
12 | { |
13 | public: |
14 | bool is_overflows = false; |
15 | Int32 bucket_num = -1; |
16 | }; |
17 | |
18 | class IBlockInputStream; |
19 | using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>; |
20 | |
21 | struct AggregatingTransformParams |
22 | { |
23 | Aggregator::Params params; |
24 | Aggregator aggregator; |
25 | bool final; |
26 | |
27 | AggregatingTransformParams(const Aggregator::Params & params_, bool final_) |
28 | : params(params_), aggregator(params), final(final_) {} |
29 | |
30 | Block () const { return aggregator.getHeader(final); } |
31 | }; |
32 | |
33 | struct ManyAggregatedData |
34 | { |
35 | ManyAggregatedDataVariants variants; |
36 | std::vector<std::unique_ptr<std::mutex>> mutexes; |
37 | std::atomic<UInt32> num_finished = 0; |
38 | |
39 | explicit ManyAggregatedData(size_t num_threads = 0) : variants(num_threads), mutexes(num_threads) |
40 | { |
41 | for (auto & elem : variants) |
42 | elem = std::make_shared<AggregatedDataVariants>(); |
43 | |
44 | for (auto & mut : mutexes) |
45 | mut = std::make_unique<std::mutex>(); |
46 | } |
47 | }; |
48 | |
49 | using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>; |
50 | using ManyAggregatedDataPtr = std::shared_ptr<ManyAggregatedData>; |
51 | |
52 | class AggregatingTransform : public IProcessor |
53 | { |
54 | public: |
55 | AggregatingTransform(Block , AggregatingTransformParamsPtr params_); |
56 | |
57 | /// For Parallel aggregating. |
58 | AggregatingTransform(Block , AggregatingTransformParamsPtr params_, |
59 | ManyAggregatedDataPtr many_data, size_t current_variant, |
60 | size_t temporary_data_merge_threads, size_t max_threads); |
61 | ~AggregatingTransform() override; |
62 | |
63 | String getName() const override { return "AggregatingTransform" ; } |
64 | Status prepare() override; |
65 | void work() override; |
66 | Processors expandPipeline() override; |
67 | |
68 | protected: |
69 | void consume(Chunk chunk); |
70 | |
71 | private: |
72 | /// To read the data that was flushed into the temporary data file. |
73 | Processors processors; |
74 | |
75 | AggregatingTransformParamsPtr params; |
76 | Logger * log = &Logger::get("AggregatingTransform" ); |
77 | |
78 | ColumnRawPtrs key_columns; |
79 | Aggregator::AggregateColumns aggregate_columns; |
80 | bool no_more_keys = false; |
81 | |
82 | ManyAggregatedDataPtr many_data; |
83 | AggregatedDataVariants & variants; |
84 | size_t max_threads = 1; |
85 | size_t temporary_data_merge_threads = 1; |
86 | |
87 | /// TODO: calculate time only for aggregation. |
88 | Stopwatch watch; |
89 | |
90 | UInt64 src_rows = 0; |
91 | UInt64 src_bytes = 0; |
92 | |
93 | bool is_generate_initialized = false; |
94 | bool is_consume_finished = false; |
95 | bool is_pipeline_created = false; |
96 | |
97 | Chunk current_chunk; |
98 | bool read_current_chunk = false; |
99 | |
100 | bool is_consume_started = false; |
101 | |
102 | void initGenerate(); |
103 | }; |
104 | |
105 | } |
106 | |