1#pragma once
2
3#include <Core/Row.h>
4#include <Core/ColumnNumbers.h>
5#include <Common/AlignedBuffer.h>
6#include <DataStreams/MergingSortedBlockInputStream.h>
7#include <AggregateFunctions/IAggregateFunction.h>
8#include <AggregateFunctions/AggregateFunctionFactory.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int LOGICAL_ERROR;
17}
18
19
20/** Merges several sorted streams into one.
21 * For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
22 * collapses them into one row, summing all the numeric columns except the primary key.
23 * If in all numeric columns, except for the primary key, the result is zero, it deletes the row.
24 */
25class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
26{
27public:
28 SummingSortedBlockInputStream(
29 const BlockInputStreams & inputs_,
30 const SortDescription & description_,
31 /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
32 const Names & column_names_to_sum_,
33 size_t max_block_size_);
34
35 String getName() const override { return "SummingSorted"; }
36
37protected:
38 /// Can return 1 more records than max_block_size.
39 Block readImpl() override;
40
41private:
42 Logger * log = &Logger::get("SummingSortedBlockInputStream");
43
44 /// Read up to the end.
45 bool finished = false;
46
47 /// Columns with which values should be summed.
48 ColumnNumbers column_numbers_not_to_aggregate;
49
50 /** A table can have nested tables that are treated in a special way.
51 * If the name of the nested table ends in `Map` and it contains at least two columns,
52 * satisfying the following criteria:
53 * - the first column, as well as all columns whose names end with `ID`, `Key` or `Type` - numeric ((U)IntN, Date, DateTime);
54 * (a tuple of such columns will be called `keys`)
55 * - the remaining columns are arithmetic ((U)IntN, Float32/64), called (`values`...).
56 * This nested table is treated as a mapping (keys...) => (values...) and when merge
57 * its rows, the merge of the elements of two sets by (keys...) with summing of corresponding (values...).
58 *
59 * Example:
60 * [(1, 100)] + [(2, 150)] -> [(1, 100), (2, 150)]
61 * [(1, 100)] + [(1, 150)] -> [(1, 250)]
62 * [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
63 * [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
64 *
65 * This very unusual functionality is made exclusively for the banner system,
66 * is not supposed for use by anyone else,
67 * and can be deleted at any time.
68 */
69
70 /// Stores aggregation function, state, and columns to be used as function arguments
71 struct AggregateDescription
72 {
73 /// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
74 AggregateFunctionPtr function;
75 IAggregateFunction::AddFunc add_function = nullptr;
76 std::vector<size_t> column_numbers;
77 MutableColumnPtr merged_column;
78 AlignedBuffer state;
79 bool created = false;
80
81 /// In case when column has type AggregateFunction: use the aggregate function from itself instead of 'function' above.
82 bool is_agg_func_type = false;
83
84 void init(const char * function_name, const DataTypes & argument_types)
85 {
86 function = AggregateFunctionFactory::instance().get(function_name, argument_types);
87 add_function = function->getAddressOfAddFunction();
88 state.reset(function->sizeOfData(), function->alignOfData());
89 }
90
91 void createState()
92 {
93 if (created)
94 return;
95 if (is_agg_func_type)
96 merged_column->insertDefault();
97 else
98 function->create(state.data());
99 created = true;
100 }
101
102 void destroyState()
103 {
104 if (!created)
105 return;
106 if (!is_agg_func_type)
107 function->destroy(state.data());
108 created = false;
109 }
110
111 /// Explicitly destroy aggregation state if the stream is terminated
112 ~AggregateDescription()
113 {
114 destroyState();
115 }
116
117 AggregateDescription() = default;
118 AggregateDescription(AggregateDescription &&) = default;
119 AggregateDescription(const AggregateDescription &) = delete;
120 };
121
122 /// Stores numbers of key-columns and value-columns.
123 struct MapDescription
124 {
125 std::vector<size_t> key_col_nums;
126 std::vector<size_t> val_col_nums;
127 };
128
129 std::vector<AggregateDescription> columns_to_aggregate;
130 std::vector<MapDescription> maps_to_sum;
131
132 SharedBlockRowRef current_key; /// The current primary key.
133 SharedBlockRowRef next_key; /// The primary key of the next row.
134
135 Row current_row;
136 bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.
137
138 size_t merged_rows = 0; /// Number of rows merged into current result block
139
140 /** We support two different cursors - with Collation and without.
141 * Templates are used instead of polymorphic SortCursor and calls to virtual functions.
142 */
143 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
144
145 /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
146 void insertCurrentRowIfNeeded(MutableColumns & merged_columns);
147
148 /// Returns true if merge result is not empty
149 bool mergeMap(const MapDescription & map, Row & row, SortCursor & cursor);
150
151 // Add the row under the cursor to the `row`.
152 void addRow(SortCursor & cursor);
153};
154
155}
156