1 | #pragma once |
2 | |
3 | #include <Columns/ColumnVector.h> |
4 | #include <Columns/ColumnsCommon.h> |
5 | #include <Columns/ColumnsNumber.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <DataTypes/DataTypesNumber.h> |
8 | #include <DataTypes/DataTypeTuple.h> |
9 | #include <DataTypes/DataTypeArray.h> |
10 | #include "IAggregateFunction.h" |
11 | |
12 | namespace DB |
13 | { |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
17 | extern const int BAD_ARGUMENTS; |
18 | extern const int BAD_CAST; |
19 | } |
20 | |
21 | /** |
22 | GradientComputer class computes gradient according to its loss function |
23 | */ |
24 | class IGradientComputer |
25 | { |
26 | public: |
27 | IGradientComputer() {} |
28 | |
29 | virtual ~IGradientComputer() = default; |
30 | |
31 | /// Adds computed gradient in new point (weights, bias) to batch_gradient |
32 | virtual void compute( |
33 | std::vector<Float64> & batch_gradient, |
34 | const std::vector<Float64> & weights, |
35 | Float64 bias, |
36 | Float64 l2_reg_coef, |
37 | Float64 target, |
38 | const IColumn ** columns, |
39 | size_t row_num) = 0; |
40 | |
41 | virtual void predict( |
42 | ColumnVector<Float64>::Container & container, |
43 | Block & block, |
44 | size_t offset, |
45 | size_t limit, |
46 | const ColumnNumbers & arguments, |
47 | const std::vector<Float64> & weights, |
48 | Float64 bias, |
49 | const Context & context) const = 0; |
50 | }; |
51 | |
52 | |
53 | class LinearRegression : public IGradientComputer |
54 | { |
55 | public: |
56 | LinearRegression() {} |
57 | |
58 | void compute( |
59 | std::vector<Float64> & batch_gradient, |
60 | const std::vector<Float64> & weights, |
61 | Float64 bias, |
62 | Float64 l2_reg_coef, |
63 | Float64 target, |
64 | const IColumn ** columns, |
65 | size_t row_num) override; |
66 | |
67 | void predict( |
68 | ColumnVector<Float64>::Container & container, |
69 | Block & block, |
70 | size_t offset, |
71 | size_t limit, |
72 | const ColumnNumbers & arguments, |
73 | const std::vector<Float64> & weights, |
74 | Float64 bias, |
75 | const Context & context) const override; |
76 | }; |
77 | |
78 | |
79 | class LogisticRegression : public IGradientComputer |
80 | { |
81 | public: |
82 | LogisticRegression() {} |
83 | |
84 | void compute( |
85 | std::vector<Float64> & batch_gradient, |
86 | const std::vector<Float64> & weights, |
87 | Float64 bias, |
88 | Float64 l2_reg_coef, |
89 | Float64 target, |
90 | const IColumn ** columns, |
91 | size_t row_num) override; |
92 | |
93 | void predict( |
94 | ColumnVector<Float64>::Container & container, |
95 | Block & block, |
96 | size_t offset, |
97 | size_t limit, |
98 | const ColumnNumbers & arguments, |
99 | const std::vector<Float64> & weights, |
100 | Float64 bias, |
101 | const Context & context) const override; |
102 | }; |
103 | |
104 | |
105 | /** |
106 | * IWeightsUpdater class defines the way to update current weights |
107 | * and uses GradientComputer class on each iteration |
108 | */ |
109 | class IWeightsUpdater |
110 | { |
111 | public: |
112 | virtual ~IWeightsUpdater() = default; |
113 | |
114 | /// Calls GradientComputer to update current mini-batch |
115 | virtual void add_to_batch( |
116 | std::vector<Float64> & batch_gradient, |
117 | IGradientComputer & gradient_computer, |
118 | const std::vector<Float64> & weights, |
119 | Float64 bias, |
120 | Float64 l2_reg_coef, |
121 | Float64 target, |
122 | const IColumn ** columns, |
123 | size_t row_num); |
124 | |
125 | /// Updates current weights according to the gradient from the last mini-batch |
126 | virtual void update( |
127 | UInt64 batch_size, |
128 | std::vector<Float64> & weights, |
129 | Float64 & bias, |
130 | Float64 learning_rate, |
131 | const std::vector<Float64> & gradient) = 0; |
132 | |
133 | /// Used during the merge of two states |
134 | virtual void merge(const IWeightsUpdater &, Float64, Float64) {} |
135 | |
136 | /// Used for serialization when necessary |
137 | virtual void write(WriteBuffer &) const {} |
138 | |
139 | /// Used for serialization when necessary |
140 | virtual void read(ReadBuffer &) {} |
141 | }; |
142 | |
143 | |
144 | class StochasticGradientDescent : public IWeightsUpdater |
145 | { |
146 | public: |
147 | void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override; |
148 | }; |
149 | |
150 | |
151 | class Momentum : public IWeightsUpdater |
152 | { |
153 | public: |
154 | Momentum() {} |
155 | |
156 | Momentum(Float64 alpha) : alpha_(alpha) {} |
157 | |
158 | void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override; |
159 | |
160 | virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override; |
161 | |
162 | void write(WriteBuffer & buf) const override; |
163 | |
164 | void read(ReadBuffer & buf) override; |
165 | |
166 | private: |
167 | Float64 alpha_{0.1}; |
168 | std::vector<Float64> accumulated_gradient; |
169 | }; |
170 | |
171 | |
172 | class Nesterov : public IWeightsUpdater |
173 | { |
174 | public: |
175 | Nesterov() {} |
176 | |
177 | Nesterov(Float64 alpha) : alpha_(alpha) {} |
178 | |
179 | void add_to_batch( |
180 | std::vector<Float64> & batch_gradient, |
181 | IGradientComputer & gradient_computer, |
182 | const std::vector<Float64> & weights, |
183 | Float64 bias, |
184 | Float64 l2_reg_coef, |
185 | Float64 target, |
186 | const IColumn ** columns, |
187 | size_t row_num) override; |
188 | |
189 | void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override; |
190 | |
191 | virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override; |
192 | |
193 | void write(WriteBuffer & buf) const override; |
194 | |
195 | void read(ReadBuffer & buf) override; |
196 | |
197 | private: |
198 | const Float64 alpha_ = 0.9; |
199 | std::vector<Float64> accumulated_gradient; |
200 | }; |
201 | |
202 | |
203 | class Adam : public IWeightsUpdater |
204 | { |
205 | public: |
206 | Adam() |
207 | { |
208 | beta1_powered_ = beta1_; |
209 | beta2_powered_ = beta2_; |
210 | } |
211 | |
212 | void add_to_batch( |
213 | std::vector<Float64> & batch_gradient, |
214 | IGradientComputer & gradient_computer, |
215 | const std::vector<Float64> & weights, |
216 | Float64 bias, |
217 | Float64 l2_reg_coef, |
218 | Float64 target, |
219 | const IColumn ** columns, |
220 | size_t row_num) override; |
221 | |
222 | void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override; |
223 | |
224 | virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override; |
225 | |
226 | void write(WriteBuffer & buf) const override; |
227 | |
228 | void read(ReadBuffer & buf) override; |
229 | |
230 | private: |
231 | /// beta1 and beta2 hyperparameters have such recommended values |
232 | const Float64 beta1_ = 0.9; |
233 | const Float64 beta2_ = 0.999; |
234 | const Float64 eps_ = 0.000001; |
235 | Float64 beta1_powered_; |
236 | Float64 beta2_powered_; |
237 | |
238 | std::vector<Float64> average_gradient; |
239 | std::vector<Float64> average_squared_gradient; |
240 | }; |
241 | |
242 | |
243 | /** LinearModelData is a class which manages current state of learning |
244 | */ |
245 | class LinearModelData |
246 | { |
247 | public: |
248 | LinearModelData() {} |
249 | |
250 | LinearModelData( |
251 | Float64 learning_rate_, |
252 | Float64 l2_reg_coef_, |
253 | UInt64 param_num_, |
254 | UInt64 batch_capacity_, |
255 | std::shared_ptr<IGradientComputer> gradient_computer_, |
256 | std::shared_ptr<IWeightsUpdater> weights_updater_); |
257 | |
258 | void add(const IColumn ** columns, size_t row_num); |
259 | |
260 | void merge(const LinearModelData & rhs); |
261 | |
262 | void write(WriteBuffer & buf) const; |
263 | |
264 | void read(ReadBuffer & buf); |
265 | |
266 | void predict( |
267 | ColumnVector<Float64>::Container & container, |
268 | Block & block, |
269 | size_t offset, |
270 | size_t limit, |
271 | const ColumnNumbers & arguments, |
272 | const Context & context) const; |
273 | |
274 | void returnWeights(IColumn & to) const; |
275 | private: |
276 | std::vector<Float64> weights; |
277 | Float64 bias{0.0}; |
278 | |
279 | Float64 learning_rate; |
280 | Float64 l2_reg_coef; |
281 | UInt64 batch_capacity; |
282 | |
283 | UInt64 iter_num = 0; |
284 | std::vector<Float64> gradient_batch; |
285 | UInt64 batch_size; |
286 | |
287 | std::shared_ptr<IGradientComputer> gradient_computer; |
288 | std::shared_ptr<IWeightsUpdater> weights_updater; |
289 | |
290 | /** The function is called when we want to flush current batch and update our weights |
291 | */ |
292 | void update_state(); |
293 | }; |
294 | |
295 | |
296 | template < |
297 | /// Implemented Machine Learning method |
298 | typename Data, |
299 | /// Name of the method |
300 | typename Name> |
301 | class AggregateFunctionMLMethod final : public IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>> |
302 | { |
303 | public: |
304 | String getName() const override { return Name::name; } |
305 | |
306 | explicit AggregateFunctionMLMethod( |
307 | UInt32 param_num_, |
308 | std::unique_ptr<IGradientComputer> gradient_computer_, |
309 | std::string weights_updater_name_, |
310 | Float64 learning_rate_, |
311 | Float64 l2_reg_coef_, |
312 | UInt64 batch_size_, |
313 | const DataTypes & arguments_types, |
314 | const Array & params) |
315 | : IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params) |
316 | , param_num(param_num_) |
317 | , learning_rate(learning_rate_) |
318 | , l2_reg_coef(l2_reg_coef_) |
319 | , batch_size(batch_size_) |
320 | , gradient_computer(std::move(gradient_computer_)) |
321 | , weights_updater_name(std::move(weights_updater_name_)) |
322 | { |
323 | } |
324 | |
325 | /// This function is called when SELECT linearRegression(...) is called |
326 | DataTypePtr getReturnType() const override |
327 | { |
328 | return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>()); |
329 | } |
330 | |
331 | /// This function is called from evalMLMethod function for correct predictValues call |
332 | DataTypePtr getReturnTypeToPredict() const override |
333 | { |
334 | return std::make_shared<DataTypeNumber<Float64>>(); |
335 | } |
336 | |
337 | void create(AggregateDataPtr place) const override |
338 | { |
339 | std::shared_ptr<IWeightsUpdater> new_weights_updater; |
340 | if (weights_updater_name == "SGD" ) |
341 | new_weights_updater = std::make_shared<StochasticGradientDescent>(); |
342 | else if (weights_updater_name == "Momentum" ) |
343 | new_weights_updater = std::make_shared<Momentum>(); |
344 | else if (weights_updater_name == "Nesterov" ) |
345 | new_weights_updater = std::make_shared<Nesterov>(); |
346 | else if (weights_updater_name == "Adam" ) |
347 | new_weights_updater = std::make_shared<Adam>(); |
348 | else |
349 | throw Exception("Illegal name of weights updater (should have been checked earlier)" , ErrorCodes::LOGICAL_ERROR); |
350 | |
351 | new (place) Data(learning_rate, l2_reg_coef, param_num, batch_size, gradient_computer, new_weights_updater); |
352 | } |
353 | |
354 | void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override |
355 | { |
356 | this->data(place).add(columns, row_num); |
357 | } |
358 | |
359 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } |
360 | |
361 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); } |
362 | |
363 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); } |
364 | |
365 | void predictValues( |
366 | ConstAggregateDataPtr place, |
367 | IColumn & to, |
368 | Block & block, |
369 | size_t offset, |
370 | size_t limit, |
371 | const ColumnNumbers & arguments, |
372 | const Context & context) const override |
373 | { |
374 | if (arguments.size() != param_num + 1) |
375 | throw Exception( |
376 | "Predict got incorrect number of arguments. Got: " + std::to_string(arguments.size()) |
377 | + ". Required: " + std::to_string(param_num + 1), |
378 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
379 | |
380 | /// This cast might be correct because column type is based on getReturnTypeToPredict. |
381 | auto * column = typeid_cast<ColumnFloat64 *>(&to); |
382 | if (!column) |
383 | throw Exception("Cast of column of predictions is incorrect. getReturnTypeToPredict must return same value as it is casted to" , |
384 | ErrorCodes::BAD_CAST); |
385 | |
386 | this->data(place).predict(column->getData(), block, offset, limit, arguments, context); |
387 | } |
388 | |
389 | /** This function is called if aggregate function without State modifier is selected in a query. |
390 | * Inserts all weights of the model into the column 'to', so user may use such information if needed |
391 | */ |
392 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
393 | { |
394 | this->data(place).returnWeights(to); |
395 | } |
396 | |
397 | private: |
398 | UInt64 param_num; |
399 | Float64 learning_rate; |
400 | Float64 l2_reg_coef; |
401 | UInt64 batch_size; |
402 | std::shared_ptr<IGradientComputer> gradient_computer; |
403 | std::string weights_updater_name; |
404 | }; |
405 | |
406 | struct NameLinearRegression |
407 | { |
408 | static constexpr auto name = "stochasticLinearRegression" ; |
409 | }; |
410 | struct NameLogisticRegression |
411 | { |
412 | static constexpr auto name = "stochasticLogisticRegression" ; |
413 | }; |
414 | |
415 | } |
416 | |