1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | #include <memory> |
5 | |
6 | #include <Core/ColumnNumbers.h> |
7 | #include <DataStreams/MergingSortedBlockInputStream.h> |
8 | #include <AggregateFunctions/IAggregateFunction.h> |
9 | #include <Columns/ColumnAggregateFunction.h> |
10 | #include <Common/AlignedBuffer.h> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | class Arena; |
17 | |
18 | /** Merges several sorted streams to one. |
19 | * During this for each group of consecutive identical values of the primary key (the columns by which the data is sorted), |
20 | * merges them into one row. When merging, the data is pre-aggregated - merge of states of aggregate functions, |
21 | * corresponding to a one value of the primary key. For columns that are not part of the primary key and which do not have the AggregateFunction type, |
22 | * when merged, the first value is selected. |
23 | */ |
24 | class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream |
25 | { |
26 | public: |
27 | AggregatingSortedBlockInputStream( |
28 | const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_); |
29 | |
30 | String getName() const override { return "AggregatingSorted" ; } |
31 | |
32 | bool isSortedOutput() const override { return true; } |
33 | |
34 | protected: |
35 | /// Can return 1 more records than max_block_size. |
36 | Block readImpl() override; |
37 | |
38 | private: |
39 | Logger * log = &Logger::get("AggregatingSortedBlockInputStream" ); |
40 | |
41 | /// Read finished. |
42 | bool finished = false; |
43 | |
44 | struct SimpleAggregateDescription; |
45 | |
46 | /// Columns with which numbers should be aggregated. |
47 | ColumnNumbers column_numbers_to_aggregate; |
48 | ColumnNumbers column_numbers_not_to_aggregate; |
49 | std::vector<ColumnAggregateFunction *> columns_to_aggregate; |
50 | std::vector<SimpleAggregateDescription> columns_to_simple_aggregate; |
51 | |
52 | SharedBlockRowRef current_key; /// The current primary key. |
53 | SharedBlockRowRef next_key; /// The primary key of the next row. |
54 | |
55 | /** We support two different cursors - with Collation and without. |
56 | * Templates are used instead of polymorphic SortCursor and calls to virtual functions. |
57 | */ |
58 | void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue); |
59 | |
60 | /** Extract all states of aggregate functions and merge them with the current group. |
61 | */ |
62 | void addRow(SortCursor & cursor); |
63 | |
64 | /** Insert all values of current row for simple aggregate functions |
65 | */ |
66 | void insertSimpleAggregationResult(MutableColumns & merged_columns); |
67 | |
68 | /// Does SimpleAggregateFunction allocates memory in arena? |
69 | bool allocatesMemoryInArena = false; |
70 | /// Memory pool for SimpleAggregateFunction |
71 | /// (only when allocatesMemoryInArena == true). |
72 | std::unique_ptr<Arena> arena; |
73 | |
74 | /// Stores information for aggregation of SimpleAggregateFunction columns |
75 | struct SimpleAggregateDescription |
76 | { |
77 | /// An aggregate function 'anyLast', 'sum'... |
78 | AggregateFunctionPtr function; |
79 | IAggregateFunction::AddFunc add_function; |
80 | size_t column_number; |
81 | AlignedBuffer state; |
82 | bool created = false; |
83 | |
84 | SimpleAggregateDescription(const AggregateFunctionPtr & function_, const size_t column_number_) : function(function_), column_number(column_number_) |
85 | { |
86 | add_function = function->getAddressOfAddFunction(); |
87 | state.reset(function->sizeOfData(), function->alignOfData()); |
88 | } |
89 | |
90 | void createState() |
91 | { |
92 | if (created) |
93 | return; |
94 | function->create(state.data()); |
95 | created = true; |
96 | } |
97 | |
98 | void destroyState() |
99 | { |
100 | if (!created) |
101 | return; |
102 | function->destroy(state.data()); |
103 | created = false; |
104 | } |
105 | |
106 | /// Explicitly destroy aggregation state if the stream is terminated |
107 | ~SimpleAggregateDescription() |
108 | { |
109 | destroyState(); |
110 | } |
111 | |
112 | SimpleAggregateDescription() = default; |
113 | SimpleAggregateDescription(SimpleAggregateDescription &&) = default; |
114 | SimpleAggregateDescription(const SimpleAggregateDescription &) = delete; |
115 | }; |
116 | }; |
117 | |
118 | } |
119 | |