| 1 | #include <Processors/IProcessor.h> |
| 2 | #include <Interpreters/Aggregator.h> |
| 3 | #include <Processors/ISimpleTransform.h> |
| 4 | #include <Processors/Transforms/AggregatingTransform.h> |
| 5 | #include <Processors/ResizeProcessor.h> |
| 6 | |
| 7 | |
| 8 | namespace DB |
| 9 | { |
| 10 | |
| 11 | /// Has several inputs and single output. |
| 12 | /// Read from inputs chunks with partially aggregated data, group them by bucket number |
| 13 | /// and write data from single bucket as single chunk. |
| 14 | class GroupingAggregatedTransform : public IProcessor |
| 15 | { |
| 16 | public: |
| 17 | GroupingAggregatedTransform(const Block & , size_t num_inputs_, AggregatingTransformParamsPtr params_); |
| 18 | String getName() const override { return "GroupingAggregatedTransform" ; } |
| 19 | |
| 20 | /// Special setting: in case if single source can return several chunks with same bucket. |
| 21 | void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; } |
| 22 | |
| 23 | protected: |
| 24 | Status prepare() override; |
| 25 | void work() override; |
| 26 | |
| 27 | private: |
| 28 | size_t num_inputs; |
| 29 | AggregatingTransformParamsPtr params; |
| 30 | |
| 31 | std::vector<Int32> last_bucket_number; |
| 32 | std::map<Int32, Chunks> chunks_map; |
| 33 | Chunks overflow_chunks; |
| 34 | Chunks single_level_chunks; |
| 35 | Int32 current_bucket = 0; |
| 36 | Int32 next_bucket_to_push = 0; /// Always <= current_bucket. |
| 37 | bool has_two_level = false; |
| 38 | |
| 39 | bool all_inputs_finished = false; |
| 40 | bool read_from_all_inputs = false; |
| 41 | std::vector<bool> read_from_input; |
| 42 | |
| 43 | bool expect_several_chunks_for_single_bucket_per_source = false; |
| 44 | |
| 45 | void addChunk(Chunk chunk, size_t input); |
| 46 | void readFromAllInputs(); |
| 47 | bool tryPushSingleLevelData(); |
| 48 | bool tryPushTwoLevelData(); |
| 49 | bool tryPushOverflowData(); |
| 50 | void pushData(Chunks chunks, Int32 bucket, bool is_overflows); |
| 51 | }; |
| 52 | |
| 53 | /// Merge aggregated data from single bucket. |
| 54 | class MergingAggregatedBucketTransform : public ISimpleTransform |
| 55 | { |
| 56 | public: |
| 57 | explicit MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params); |
| 58 | String getName() const override { return "MergingAggregatedBucketTransform" ; } |
| 59 | |
| 60 | protected: |
| 61 | void transform(Chunk & chunk) override; |
| 62 | |
| 63 | private: |
| 64 | AggregatingTransformParamsPtr params; |
| 65 | }; |
| 66 | |
| 67 | /// Has several inputs and single output. |
| 68 | /// Read from inputs merged bucket with aggregated data, sort them by bucket number and write to output. |
| 69 | /// Presumption: inputs return chunks with increasing bucket number, there is at most one chunk per bucket. |
| 70 | class SortingAggregatedTransform : public IProcessor |
| 71 | { |
| 72 | public: |
| 73 | SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params); |
| 74 | String getName() const override { return "SortingAggregatedTransform" ; } |
| 75 | Status prepare() override; |
| 76 | |
| 77 | private: |
| 78 | size_t num_inputs; |
| 79 | AggregatingTransformParamsPtr params; |
| 80 | std::vector<Int32> last_bucket_number; |
| 81 | std::vector<bool> is_input_finished; |
| 82 | std::map<Int32, Chunk> chunks; |
| 83 | Chunk overflow_chunk; |
| 84 | |
| 85 | bool tryPushChunk(); |
| 86 | void addChunk(Chunk chunk, size_t from_input); |
| 87 | }; |
| 88 | |
| 89 | /// Creates piece of pipeline which performs memory efficient merging of partially aggregated data from several sources. |
| 90 | /// First processor will have num_inputs, last - single output. You should connect them to create pipeline. |
| 91 | Processors createMergingAggregatedMemoryEfficientPipe( |
| 92 | Block , |
| 93 | AggregatingTransformParamsPtr params, |
| 94 | size_t num_inputs, |
| 95 | size_t num_merging_processors); |
| 96 | |
| 97 | } |
| 98 | |
| 99 | |