1 | #pragma once |
2 | |
3 | #include <Core/Row.h> |
4 | #include <Core/ColumnNumbers.h> |
5 | #include <Common/AlignedBuffer.h> |
6 | #include <DataStreams/MergingSortedBlockInputStream.h> |
7 | #include <AggregateFunctions/IAggregateFunction.h> |
8 | #include <AggregateFunctions/AggregateFunctionFactory.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int LOGICAL_ERROR; |
17 | } |
18 | |
19 | |
20 | /** Merges several sorted streams into one. |
21 | * For each group of consecutive identical values of the primary key (the columns by which the data is sorted), |
22 | * collapses them into one row, summing all the numeric columns except the primary key. |
23 | * If in all numeric columns, except for the primary key, the result is zero, it deletes the row. |
24 | */ |
25 | class SummingSortedBlockInputStream : public MergingSortedBlockInputStream |
26 | { |
27 | public: |
28 | SummingSortedBlockInputStream( |
29 | const BlockInputStreams & inputs_, |
30 | const SortDescription & description_, |
31 | /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. |
32 | const Names & column_names_to_sum_, |
33 | size_t max_block_size_); |
34 | |
35 | String getName() const override { return "SummingSorted" ; } |
36 | |
37 | protected: |
38 | /// Can return 1 more records than max_block_size. |
39 | Block readImpl() override; |
40 | |
41 | private: |
42 | Logger * log = &Logger::get("SummingSortedBlockInputStream" ); |
43 | |
44 | /// Read up to the end. |
45 | bool finished = false; |
46 | |
47 | /// Columns with which values should be summed. |
48 | ColumnNumbers column_numbers_not_to_aggregate; |
49 | |
50 | /** A table can have nested tables that are treated in a special way. |
51 | * If the name of the nested table ends in `Map` and it contains at least two columns, |
52 | * satisfying the following criteria: |
53 | * - the first column, as well as all columns whose names end with `ID`, `Key` or `Type` - numeric ((U)IntN, Date, DateTime); |
54 | * (a tuple of such columns will be called `keys`) |
55 | * - the remaining columns are arithmetic ((U)IntN, Float32/64), called (`values`...). |
56 | * This nested table is treated as a mapping (keys...) => (values...) and when merge |
57 | * its rows, the merge of the elements of two sets by (keys...) with summing of corresponding (values...). |
58 | * |
59 | * Example: |
60 | * [(1, 100)] + [(2, 150)] -> [(1, 100), (2, 150)] |
61 | * [(1, 100)] + [(1, 150)] -> [(1, 250)] |
62 | * [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] |
63 | * [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] |
64 | * |
65 | * This very unusual functionality is made exclusively for the banner system, |
66 | * is not supposed for use by anyone else, |
67 | * and can be deleted at any time. |
68 | */ |
69 | |
70 | /// Stores aggregation function, state, and columns to be used as function arguments |
71 | struct AggregateDescription |
72 | { |
73 | /// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing. |
74 | AggregateFunctionPtr function; |
75 | IAggregateFunction::AddFunc add_function = nullptr; |
76 | std::vector<size_t> column_numbers; |
77 | MutableColumnPtr merged_column; |
78 | AlignedBuffer state; |
79 | bool created = false; |
80 | |
81 | /// In case when column has type AggregateFunction: use the aggregate function from itself instead of 'function' above. |
82 | bool is_agg_func_type = false; |
83 | |
84 | void init(const char * function_name, const DataTypes & argument_types) |
85 | { |
86 | function = AggregateFunctionFactory::instance().get(function_name, argument_types); |
87 | add_function = function->getAddressOfAddFunction(); |
88 | state.reset(function->sizeOfData(), function->alignOfData()); |
89 | } |
90 | |
91 | void createState() |
92 | { |
93 | if (created) |
94 | return; |
95 | if (is_agg_func_type) |
96 | merged_column->insertDefault(); |
97 | else |
98 | function->create(state.data()); |
99 | created = true; |
100 | } |
101 | |
102 | void destroyState() |
103 | { |
104 | if (!created) |
105 | return; |
106 | if (!is_agg_func_type) |
107 | function->destroy(state.data()); |
108 | created = false; |
109 | } |
110 | |
111 | /// Explicitly destroy aggregation state if the stream is terminated |
112 | ~AggregateDescription() |
113 | { |
114 | destroyState(); |
115 | } |
116 | |
117 | AggregateDescription() = default; |
118 | AggregateDescription(AggregateDescription &&) = default; |
119 | AggregateDescription(const AggregateDescription &) = delete; |
120 | }; |
121 | |
122 | /// Stores numbers of key-columns and value-columns. |
123 | struct MapDescription |
124 | { |
125 | std::vector<size_t> key_col_nums; |
126 | std::vector<size_t> val_col_nums; |
127 | }; |
128 | |
129 | std::vector<AggregateDescription> columns_to_aggregate; |
130 | std::vector<MapDescription> maps_to_sum; |
131 | |
132 | SharedBlockRowRef current_key; /// The current primary key. |
133 | SharedBlockRowRef next_key; /// The primary key of the next row. |
134 | |
135 | Row current_row; |
136 | bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally. |
137 | |
138 | size_t merged_rows = 0; /// Number of rows merged into current result block |
139 | |
140 | /** We support two different cursors - with Collation and without. |
141 | * Templates are used instead of polymorphic SortCursor and calls to virtual functions. |
142 | */ |
143 | void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue); |
144 | |
145 | /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". |
146 | void insertCurrentRowIfNeeded(MutableColumns & merged_columns); |
147 | |
148 | /// Returns true if merge result is not empty |
149 | bool mergeMap(const MapDescription & map, Row & row, SortCursor & cursor); |
150 | |
151 | // Add the row under the cursor to the `row`. |
152 | void addRow(SortCursor & cursor); |
153 | }; |
154 | |
155 | } |
156 | |