1 | #include <Processors/IProcessor.h> |
2 | #include <Interpreters/Aggregator.h> |
3 | #include <Processors/ISimpleTransform.h> |
4 | #include <Processors/Transforms/AggregatingTransform.h> |
5 | #include <Processors/ResizeProcessor.h> |
6 | |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | /// Has several inputs and single output. |
12 | /// Read from inputs chunks with partially aggregated data, group them by bucket number |
13 | /// and write data from single bucket as single chunk. |
14 | class GroupingAggregatedTransform : public IProcessor |
15 | { |
16 | public: |
17 | GroupingAggregatedTransform(const Block & , size_t num_inputs_, AggregatingTransformParamsPtr params_); |
18 | String getName() const override { return "GroupingAggregatedTransform" ; } |
19 | |
20 | /// Special setting: in case if single source can return several chunks with same bucket. |
21 | void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; } |
22 | |
23 | protected: |
24 | Status prepare() override; |
25 | void work() override; |
26 | |
27 | private: |
28 | size_t num_inputs; |
29 | AggregatingTransformParamsPtr params; |
30 | |
31 | std::vector<Int32> last_bucket_number; |
32 | std::map<Int32, Chunks> chunks_map; |
33 | Chunks overflow_chunks; |
34 | Chunks single_level_chunks; |
35 | Int32 current_bucket = 0; |
36 | Int32 next_bucket_to_push = 0; /// Always <= current_bucket. |
37 | bool has_two_level = false; |
38 | |
39 | bool all_inputs_finished = false; |
40 | bool read_from_all_inputs = false; |
41 | std::vector<bool> read_from_input; |
42 | |
43 | bool expect_several_chunks_for_single_bucket_per_source = false; |
44 | |
45 | void addChunk(Chunk chunk, size_t input); |
46 | void readFromAllInputs(); |
47 | bool tryPushSingleLevelData(); |
48 | bool tryPushTwoLevelData(); |
49 | bool tryPushOverflowData(); |
50 | void pushData(Chunks chunks, Int32 bucket, bool is_overflows); |
51 | }; |
52 | |
53 | /// Merge aggregated data from single bucket. |
54 | class MergingAggregatedBucketTransform : public ISimpleTransform |
55 | { |
56 | public: |
57 | explicit MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params); |
58 | String getName() const override { return "MergingAggregatedBucketTransform" ; } |
59 | |
60 | protected: |
61 | void transform(Chunk & chunk) override; |
62 | |
63 | private: |
64 | AggregatingTransformParamsPtr params; |
65 | }; |
66 | |
67 | /// Has several inputs and single output. |
68 | /// Read from inputs merged bucket with aggregated data, sort them by bucket number and write to output. |
69 | /// Presumption: inputs return chunks with increasing bucket number, there is at most one chunk per bucket. |
70 | class SortingAggregatedTransform : public IProcessor |
71 | { |
72 | public: |
73 | SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params); |
74 | String getName() const override { return "SortingAggregatedTransform" ; } |
75 | Status prepare() override; |
76 | |
77 | private: |
78 | size_t num_inputs; |
79 | AggregatingTransformParamsPtr params; |
80 | std::vector<Int32> last_bucket_number; |
81 | std::vector<bool> is_input_finished; |
82 | std::map<Int32, Chunk> chunks; |
83 | Chunk overflow_chunk; |
84 | |
85 | bool tryPushChunk(); |
86 | void addChunk(Chunk chunk, size_t from_input); |
87 | }; |
88 | |
89 | /// Creates piece of pipeline which performs memory efficient merging of partially aggregated data from several sources. |
90 | /// First processor will have num_inputs, last - single output. You should connect them to create pipeline. |
91 | Processors createMergingAggregatedMemoryEfficientPipe( |
92 | Block , |
93 | AggregatingTransformParamsPtr params, |
94 | size_t num_inputs, |
95 | size_t num_merging_processors); |
96 | |
97 | } |
98 | |
99 | |