1#pragma once
2
3#include <Columns/ColumnVector.h>
4#include <Columns/ColumnsCommon.h>
5#include <Columns/ColumnsNumber.h>
6#include <Common/typeid_cast.h>
7#include <DataTypes/DataTypesNumber.h>
8#include <DataTypes/DataTypeTuple.h>
9#include <DataTypes/DataTypeArray.h>
10#include "IAggregateFunction.h"
11
12namespace DB
13{
14namespace ErrorCodes
15{
16 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
17 extern const int BAD_ARGUMENTS;
18 extern const int BAD_CAST;
19}
20
21/**
22GradientComputer class computes gradient according to its loss function
23*/
24class IGradientComputer
25{
26public:
27 IGradientComputer() {}
28
29 virtual ~IGradientComputer() = default;
30
31 /// Adds computed gradient in new point (weights, bias) to batch_gradient
32 virtual void compute(
33 std::vector<Float64> & batch_gradient,
34 const std::vector<Float64> & weights,
35 Float64 bias,
36 Float64 l2_reg_coef,
37 Float64 target,
38 const IColumn ** columns,
39 size_t row_num) = 0;
40
41 virtual void predict(
42 ColumnVector<Float64>::Container & container,
43 Block & block,
44 size_t offset,
45 size_t limit,
46 const ColumnNumbers & arguments,
47 const std::vector<Float64> & weights,
48 Float64 bias,
49 const Context & context) const = 0;
50};
51
52
53class LinearRegression : public IGradientComputer
54{
55public:
56 LinearRegression() {}
57
58 void compute(
59 std::vector<Float64> & batch_gradient,
60 const std::vector<Float64> & weights,
61 Float64 bias,
62 Float64 l2_reg_coef,
63 Float64 target,
64 const IColumn ** columns,
65 size_t row_num) override;
66
67 void predict(
68 ColumnVector<Float64>::Container & container,
69 Block & block,
70 size_t offset,
71 size_t limit,
72 const ColumnNumbers & arguments,
73 const std::vector<Float64> & weights,
74 Float64 bias,
75 const Context & context) const override;
76};
77
78
79class LogisticRegression : public IGradientComputer
80{
81public:
82 LogisticRegression() {}
83
84 void compute(
85 std::vector<Float64> & batch_gradient,
86 const std::vector<Float64> & weights,
87 Float64 bias,
88 Float64 l2_reg_coef,
89 Float64 target,
90 const IColumn ** columns,
91 size_t row_num) override;
92
93 void predict(
94 ColumnVector<Float64>::Container & container,
95 Block & block,
96 size_t offset,
97 size_t limit,
98 const ColumnNumbers & arguments,
99 const std::vector<Float64> & weights,
100 Float64 bias,
101 const Context & context) const override;
102};
103
104
105/**
106* IWeightsUpdater class defines the way to update current weights
107* and uses GradientComputer class on each iteration
108*/
109class IWeightsUpdater
110{
111public:
112 virtual ~IWeightsUpdater() = default;
113
114 /// Calls GradientComputer to update current mini-batch
115 virtual void add_to_batch(
116 std::vector<Float64> & batch_gradient,
117 IGradientComputer & gradient_computer,
118 const std::vector<Float64> & weights,
119 Float64 bias,
120 Float64 l2_reg_coef,
121 Float64 target,
122 const IColumn ** columns,
123 size_t row_num);
124
125 /// Updates current weights according to the gradient from the last mini-batch
126 virtual void update(
127 UInt64 batch_size,
128 std::vector<Float64> & weights,
129 Float64 & bias,
130 Float64 learning_rate,
131 const std::vector<Float64> & gradient) = 0;
132
133 /// Used during the merge of two states
134 virtual void merge(const IWeightsUpdater &, Float64, Float64) {}
135
136 /// Used for serialization when necessary
137 virtual void write(WriteBuffer &) const {}
138
139 /// Used for serialization when necessary
140 virtual void read(ReadBuffer &) {}
141};
142
143
144class StochasticGradientDescent : public IWeightsUpdater
145{
146public:
147 void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
148};
149
150
151class Momentum : public IWeightsUpdater
152{
153public:
154 Momentum() {}
155
156 Momentum(Float64 alpha) : alpha_(alpha) {}
157
158 void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
159
160 virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
161
162 void write(WriteBuffer & buf) const override;
163
164 void read(ReadBuffer & buf) override;
165
166private:
167 Float64 alpha_{0.1};
168 std::vector<Float64> accumulated_gradient;
169};
170
171
172class Nesterov : public IWeightsUpdater
173{
174public:
175 Nesterov() {}
176
177 Nesterov(Float64 alpha) : alpha_(alpha) {}
178
179 void add_to_batch(
180 std::vector<Float64> & batch_gradient,
181 IGradientComputer & gradient_computer,
182 const std::vector<Float64> & weights,
183 Float64 bias,
184 Float64 l2_reg_coef,
185 Float64 target,
186 const IColumn ** columns,
187 size_t row_num) override;
188
189 void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
190
191 virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
192
193 void write(WriteBuffer & buf) const override;
194
195 void read(ReadBuffer & buf) override;
196
197private:
198 const Float64 alpha_ = 0.9;
199 std::vector<Float64> accumulated_gradient;
200};
201
202
203class Adam : public IWeightsUpdater
204{
205public:
206 Adam()
207 {
208 beta1_powered_ = beta1_;
209 beta2_powered_ = beta2_;
210 }
211
212 void add_to_batch(
213 std::vector<Float64> & batch_gradient,
214 IGradientComputer & gradient_computer,
215 const std::vector<Float64> & weights,
216 Float64 bias,
217 Float64 l2_reg_coef,
218 Float64 target,
219 const IColumn ** columns,
220 size_t row_num) override;
221
222 void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
223
224 virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
225
226 void write(WriteBuffer & buf) const override;
227
228 void read(ReadBuffer & buf) override;
229
230private:
231 /// beta1 and beta2 hyperparameters have such recommended values
232 const Float64 beta1_ = 0.9;
233 const Float64 beta2_ = 0.999;
234 const Float64 eps_ = 0.000001;
235 Float64 beta1_powered_;
236 Float64 beta2_powered_;
237
238 std::vector<Float64> average_gradient;
239 std::vector<Float64> average_squared_gradient;
240};
241
242
243/** LinearModelData is a class which manages current state of learning
244 */
245class LinearModelData
246{
247public:
248 LinearModelData() {}
249
250 LinearModelData(
251 Float64 learning_rate_,
252 Float64 l2_reg_coef_,
253 UInt64 param_num_,
254 UInt64 batch_capacity_,
255 std::shared_ptr<IGradientComputer> gradient_computer_,
256 std::shared_ptr<IWeightsUpdater> weights_updater_);
257
258 void add(const IColumn ** columns, size_t row_num);
259
260 void merge(const LinearModelData & rhs);
261
262 void write(WriteBuffer & buf) const;
263
264 void read(ReadBuffer & buf);
265
266 void predict(
267 ColumnVector<Float64>::Container & container,
268 Block & block,
269 size_t offset,
270 size_t limit,
271 const ColumnNumbers & arguments,
272 const Context & context) const;
273
274 void returnWeights(IColumn & to) const;
275private:
276 std::vector<Float64> weights;
277 Float64 bias{0.0};
278
279 Float64 learning_rate;
280 Float64 l2_reg_coef;
281 UInt64 batch_capacity;
282
283 UInt64 iter_num = 0;
284 std::vector<Float64> gradient_batch;
285 UInt64 batch_size;
286
287 std::shared_ptr<IGradientComputer> gradient_computer;
288 std::shared_ptr<IWeightsUpdater> weights_updater;
289
290 /** The function is called when we want to flush current batch and update our weights
291 */
292 void update_state();
293};
294
295
296template <
297 /// Implemented Machine Learning method
298 typename Data,
299 /// Name of the method
300 typename Name>
301class AggregateFunctionMLMethod final : public IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>
302{
303public:
304 String getName() const override { return Name::name; }
305
306 explicit AggregateFunctionMLMethod(
307 UInt32 param_num_,
308 std::unique_ptr<IGradientComputer> gradient_computer_,
309 std::string weights_updater_name_,
310 Float64 learning_rate_,
311 Float64 l2_reg_coef_,
312 UInt64 batch_size_,
313 const DataTypes & arguments_types,
314 const Array & params)
315 : IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params)
316 , param_num(param_num_)
317 , learning_rate(learning_rate_)
318 , l2_reg_coef(l2_reg_coef_)
319 , batch_size(batch_size_)
320 , gradient_computer(std::move(gradient_computer_))
321 , weights_updater_name(std::move(weights_updater_name_))
322 {
323 }
324
325 /// This function is called when SELECT linearRegression(...) is called
326 DataTypePtr getReturnType() const override
327 {
328 return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
329 }
330
331 /// This function is called from evalMLMethod function for correct predictValues call
332 DataTypePtr getReturnTypeToPredict() const override
333 {
334 return std::make_shared<DataTypeNumber<Float64>>();
335 }
336
337 void create(AggregateDataPtr place) const override
338 {
339 std::shared_ptr<IWeightsUpdater> new_weights_updater;
340 if (weights_updater_name == "SGD")
341 new_weights_updater = std::make_shared<StochasticGradientDescent>();
342 else if (weights_updater_name == "Momentum")
343 new_weights_updater = std::make_shared<Momentum>();
344 else if (weights_updater_name == "Nesterov")
345 new_weights_updater = std::make_shared<Nesterov>();
346 else if (weights_updater_name == "Adam")
347 new_weights_updater = std::make_shared<Adam>();
348 else
349 throw Exception("Illegal name of weights updater (should have been checked earlier)", ErrorCodes::LOGICAL_ERROR);
350
351 new (place) Data(learning_rate, l2_reg_coef, param_num, batch_size, gradient_computer, new_weights_updater);
352 }
353
354 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
355 {
356 this->data(place).add(columns, row_num);
357 }
358
359 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); }
360
361 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); }
362
363 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); }
364
365 void predictValues(
366 ConstAggregateDataPtr place,
367 IColumn & to,
368 Block & block,
369 size_t offset,
370 size_t limit,
371 const ColumnNumbers & arguments,
372 const Context & context) const override
373 {
374 if (arguments.size() != param_num + 1)
375 throw Exception(
376 "Predict got incorrect number of arguments. Got: " + std::to_string(arguments.size())
377 + ". Required: " + std::to_string(param_num + 1),
378 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
379
380 /// This cast might be correct because column type is based on getReturnTypeToPredict.
381 auto * column = typeid_cast<ColumnFloat64 *>(&to);
382 if (!column)
383 throw Exception("Cast of column of predictions is incorrect. getReturnTypeToPredict must return same value as it is casted to",
384 ErrorCodes::BAD_CAST);
385
386 this->data(place).predict(column->getData(), block, offset, limit, arguments, context);
387 }
388
389 /** This function is called if aggregate function without State modifier is selected in a query.
390 * Inserts all weights of the model into the column 'to', so user may use such information if needed
391 */
392 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
393 {
394 this->data(place).returnWeights(to);
395 }
396
397private:
398 UInt64 param_num;
399 Float64 learning_rate;
400 Float64 l2_reg_coef;
401 UInt64 batch_size;
402 std::shared_ptr<IGradientComputer> gradient_computer;
403 std::string weights_updater_name;
404};
405
406struct NameLinearRegression
407{
408 static constexpr auto name = "stochasticLinearRegression";
409};
410struct NameLogisticRegression
411{
412 static constexpr auto name = "stochasticLogisticRegression";
413};
414
415}
416