1#pragma once
2
3#include <cstddef>
4#include <memory>
5#include <vector>
6#include <type_traits>
7
8#include <Core/Types.h>
9#include <Core/ColumnNumbers.h>
10#include <Core/Block.h>
11#include <Common/Exception.h>
12#include <Core/Field.h>
13
14
15namespace DB
16{
17
18class Arena;
19class ReadBuffer;
20class WriteBuffer;
21class IColumn;
22class IDataType;
23
24using DataTypePtr = std::shared_ptr<const IDataType>;
25using DataTypes = std::vector<DataTypePtr>;
26
27using AggregateDataPtr = char *;
28using ConstAggregateDataPtr = const char *;
29
30
31/** Aggregate functions interface.
32 * Instances of classes with this interface do not contain the data itself for aggregation,
33 * but contain only metadata (description) of the aggregate function,
34 * as well as methods for creating, deleting and working with data.
35 * The data resulting from the aggregation (intermediate computing states) is stored in other objects
36 * (which can be created in some memory pool),
37 * and IAggregateFunction is the external interface for manipulating them.
38 */
39class IAggregateFunction
40{
41public:
42 IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_)
43 : argument_types(argument_types_), parameters(parameters_) {}
44
45 /// Get main function name.
46 virtual String getName() const = 0;
47
48 /// Get the result type.
49 virtual DataTypePtr getReturnType() const = 0;
50
51 /// Get type which will be used for prediction result in case if function is an ML method.
52 virtual DataTypePtr getReturnTypeToPredict() const
53 {
54 throw Exception("Prediction is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
55 }
56
57 virtual ~IAggregateFunction() {}
58
59 /** Data manipulating functions. */
60
61 /** Create empty data for aggregation with `placement new` at the specified location.
62 * You will have to destroy them using the `destroy` method.
63 */
64 virtual void create(AggregateDataPtr place) const = 0;
65
66 /// Delete data for aggregation.
67 virtual void destroy(AggregateDataPtr place) const noexcept = 0;
68
69 /// It is not necessary to delete data.
70 virtual bool hasTrivialDestructor() const = 0;
71
72 /// Get `sizeof` of structure with data.
73 virtual size_t sizeOfData() const = 0;
74
75 /// How the data structure should be aligned. NOTE: Currently not used (structures with aggregation state are put without alignment).
76 virtual size_t alignOfData() const = 0;
77
78 /** Adds a value into aggregation data on which place points to.
79 * columns points to columns containing arguments of aggregation function.
80 * row_num is number of row which should be added.
81 * Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
82 */
83 virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;
84
85 /// Merges state (on which place points to) with other state of current aggregation function.
86 virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
87
88 /// Serializes state (to transmit it over the network, for example).
89 virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;
90
91 /// Deserializes state. This function is called only for empty (just created) states.
92 virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
93
94 /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
95 virtual bool allocatesMemoryInArena() const
96 {
97 return false;
98 }
99
100 /// Inserts results into a column.
101 virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
102
103 /// Used for machine learning methods. Predict result from trained model.
104 /// Will insert result into `to` column for rows in range [offset, offset + limit).
105 virtual void predictValues(
106 ConstAggregateDataPtr /* place */,
107 IColumn & /*to*/,
108 Block & /*block*/,
109 size_t /*offset*/,
110 size_t /*limit*/,
111 const ColumnNumbers & /*arguments*/,
112 const Context & /*context*/) const
113 {
114 throw Exception("Method predictValues is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
115 }
116
117 /** Returns true for aggregate functions of type -State.
118 * They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another).
119 */
120 virtual bool isState() const { return false; }
121
122 /** The inner loop that uses the function pointer is better than using the virtual function.
123 * The reason is that in the case of virtual functions GCC 5.1.2 generates code,
124 * which, at each iteration of the loop, reloads the function address (the offset value in the virtual function table) from memory to the register.
125 * This gives a performance drop on simple queries around 12%.
126 * After the appearance of better compilers, the code can be removed.
127 */
128 using AddFunc = void (*)(const IAggregateFunction *, AggregateDataPtr, const IColumn **, size_t, Arena *);
129 virtual AddFunc getAddressOfAddFunction() const = 0;
130
131 /** Contains a loop with calls to "add" function. You can collect arguments into array "places"
132 * and do a single call to "addBatch" for devirtualization and inlining.
133 */
134 virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
135
136 /** The same for single place.
137 */
138 virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
139
140 /** In addition to addBatch, this method collects multiple rows of arguments into array "places"
141 * as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
142 * -Array combinator. It might also be used generally to break data dependency when array
143 * "places" contains a large number of same values consecutively.
144 */
145 virtual void addBatchArray(
146 size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena) const = 0;
147
148 const DataTypes & getArgumentTypes() const { return argument_types; }
149 const Array & getParameters() const { return parameters; }
150
151protected:
152 DataTypes argument_types;
153 Array parameters;
154};
155
156
157/// Implement method to obtain an address of 'add' function.
158template <typename Derived>
159class IAggregateFunctionHelper : public IAggregateFunction
160{
161private:
162 static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
163 {
164 static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
165 }
166
167public:
168 IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
169 : IAggregateFunction(argument_types_, parameters_) {}
170
171 AddFunc getAddressOfAddFunction() const override { return &addFree; }
172
173 void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
174 {
175 for (size_t i = 0; i < batch_size; ++i)
176 static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
177 }
178
179 void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
180 {
181 for (size_t i = 0; i < batch_size; ++i)
182 static_cast<const Derived *>(this)->add(place, columns, i, arena);
183 }
184
185 void addBatchArray(
186 size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
187 const override
188 {
189 size_t current_offset = 0;
190 for (size_t i = 0; i < batch_size; ++i)
191 {
192 size_t next_offset = offsets[i];
193 for (size_t j = current_offset; j < next_offset; ++j)
194 static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, j, arena);
195 current_offset = next_offset;
196 }
197 }
198};
199
200
201/// Implements several methods for manipulation with data. T - type of structure with data for aggregation.
202template <typename T, typename Derived>
203class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived>
204{
205protected:
206 using Data = T;
207
208 static Data & data(AggregateDataPtr place) { return *reinterpret_cast<Data*>(place); }
209 static const Data & data(ConstAggregateDataPtr place) { return *reinterpret_cast<const Data*>(place); }
210
211public:
212 IAggregateFunctionDataHelper(const DataTypes & argument_types_, const Array & parameters_)
213 : IAggregateFunctionHelper<Derived>(argument_types_, parameters_) {}
214
215 void create(AggregateDataPtr place) const override
216 {
217 new (place) Data;
218 }
219
220 void destroy(AggregateDataPtr place) const noexcept override
221 {
222 data(place).~Data();
223 }
224
225 bool hasTrivialDestructor() const override
226 {
227 return std::is_trivially_destructible_v<Data>;
228 }
229
230 size_t sizeOfData() const override
231 {
232 return sizeof(Data);
233 }
234
235 /// NOTE: Currently not used (structures with aggregation state are put without alignment).
236 size_t alignOfData() const override
237 {
238 return alignof(Data);
239 }
240};
241
242
243using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
244
245}
246