1#include <Processors/IProcessor.h>
2#include <Interpreters/Aggregator.h>
3#include <Processors/ISimpleTransform.h>
4#include <Processors/Transforms/AggregatingTransform.h>
5#include <Processors/ResizeProcessor.h>
6
7
8namespace DB
9{
10
11/// Has several inputs and single output.
12/// Read from inputs chunks with partially aggregated data, group them by bucket number
13/// and write data from single bucket as single chunk.
14class GroupingAggregatedTransform : public IProcessor
15{
16public:
17 GroupingAggregatedTransform(const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_);
18 String getName() const override { return "GroupingAggregatedTransform"; }
19
20 /// Special setting: in case if single source can return several chunks with same bucket.
21 void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }
22
23protected:
24 Status prepare() override;
25 void work() override;
26
27private:
28 size_t num_inputs;
29 AggregatingTransformParamsPtr params;
30
31 std::vector<Int32> last_bucket_number;
32 std::map<Int32, Chunks> chunks_map;
33 Chunks overflow_chunks;
34 Chunks single_level_chunks;
35 Int32 current_bucket = 0;
36 Int32 next_bucket_to_push = 0; /// Always <= current_bucket.
37 bool has_two_level = false;
38
39 bool all_inputs_finished = false;
40 bool read_from_all_inputs = false;
41 std::vector<bool> read_from_input;
42
43 bool expect_several_chunks_for_single_bucket_per_source = false;
44
45 void addChunk(Chunk chunk, size_t input);
46 void readFromAllInputs();
47 bool tryPushSingleLevelData();
48 bool tryPushTwoLevelData();
49 bool tryPushOverflowData();
50 void pushData(Chunks chunks, Int32 bucket, bool is_overflows);
51};
52
53/// Merge aggregated data from single bucket.
54class MergingAggregatedBucketTransform : public ISimpleTransform
55{
56public:
57 explicit MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params);
58 String getName() const override { return "MergingAggregatedBucketTransform"; }
59
60protected:
61 void transform(Chunk & chunk) override;
62
63private:
64 AggregatingTransformParamsPtr params;
65};
66
67/// Has several inputs and single output.
68/// Read from inputs merged bucket with aggregated data, sort them by bucket number and write to output.
69/// Presumption: inputs return chunks with increasing bucket number, there is at most one chunk per bucket.
70class SortingAggregatedTransform : public IProcessor
71{
72public:
73 SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params);
74 String getName() const override { return "SortingAggregatedTransform"; }
75 Status prepare() override;
76
77private:
78 size_t num_inputs;
79 AggregatingTransformParamsPtr params;
80 std::vector<Int32> last_bucket_number;
81 std::vector<bool> is_input_finished;
82 std::map<Int32, Chunk> chunks;
83 Chunk overflow_chunk;
84
85 bool tryPushChunk();
86 void addChunk(Chunk chunk, size_t from_input);
87};
88
89/// Creates piece of pipeline which performs memory efficient merging of partially aggregated data from several sources.
90/// First processor will have num_inputs, last - single output. You should connect them to create pipeline.
91Processors createMergingAggregatedMemoryEfficientPipe(
92 Block header,
93 AggregatingTransformParamsPtr params,
94 size_t num_inputs,
95 size_t num_merging_processors);
96
97}
98
99