1#include "AggregateFunctionMLMethod.h"
2
3#include <IO/ReadHelpers.h>
4#include <IO/WriteHelpers.h>
5#include <Interpreters/castColumn.h>
6#include <Columns/ColumnArray.h>
7#include <Columns/ColumnTuple.h>
8#include <Common/FieldVisitors.h>
9#include <Common/typeid_cast.h>
10#include <Common/assert_cast.h>
11#include "AggregateFunctionFactory.h"
12#include "FactoryHelpers.h"
13#include "Helpers.h"
14#include "registerAggregateFunctions.h"
15
16
17namespace DB
18{
19namespace
20{
21 using FuncLinearRegression = AggregateFunctionMLMethod<LinearModelData, NameLinearRegression>;
22 using FuncLogisticRegression = AggregateFunctionMLMethod<LinearModelData, NameLogisticRegression>;
23 template <class Method>
24 AggregateFunctionPtr
25 createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters)
26 {
27 if (parameters.size() > 4)
28 throw Exception(
29 "Aggregate function " + name
30 + " requires at most four parameters: learning_rate, l2_regularization_coef, mini-batch size and weights_updater "
31 "method",
32 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
33
34 if (argument_types.size() < 2)
35 throw Exception(
36 "Aggregate function " + name + " requires at least two arguments: target and model's parameters",
37 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
38
39 for (size_t i = 0; i < argument_types.size(); ++i)
40 {
41 if (!isNativeNumber(argument_types[i]))
42 throw Exception(
43 "Argument " + std::to_string(i) + " of type " + argument_types[i]->getName()
44 + " must be numeric for aggregate function " + name,
45 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
46 }
47
48 /// Such default parameters were picked because they did good on some tests,
49 /// though it still requires to fit parameters to achieve better result
50 auto learning_rate = Float64(1.0);
51 auto l2_reg_coef = Float64(0.5);
52 UInt64 batch_size = 15;
53
54 std::string weights_updater_name = "Adam";
55 std::unique_ptr<IGradientComputer> gradient_computer;
56
57 if (!parameters.empty())
58 {
59 learning_rate = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[0]);
60 }
61 if (parameters.size() > 1)
62 {
63 l2_reg_coef = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters[1]);
64 }
65 if (parameters.size() > 2)
66 {
67 batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), parameters[2]);
68 }
69 if (parameters.size() > 3)
70 {
71 weights_updater_name = parameters[3].safeGet<String>();
72 if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov" && weights_updater_name != "Adam")
73 throw Exception("Invalid parameter for weights updater. The only supported are 'SGD', 'Momentum' and 'Nesterov'",
74 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
75 }
76
77 if (std::is_same<Method, FuncLinearRegression>::value)
78 {
79 gradient_computer = std::make_unique<LinearRegression>();
80 }
81 else if (std::is_same<Method, FuncLogisticRegression>::value)
82 {
83 gradient_computer = std::make_unique<LogisticRegression>();
84 }
85 else
86 {
87 throw Exception("Such gradient computer is not implemented yet", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
88 }
89
90 return std::make_shared<Method>(
91 argument_types.size() - 1,
92 std::move(gradient_computer),
93 weights_updater_name,
94 learning_rate,
95 l2_reg_coef,
96 batch_size,
97 argument_types,
98 parameters);
99 }
100}
101
102void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory)
103{
104 factory.registerFunction("stochasticLinearRegression", createAggregateFunctionMLMethod<FuncLinearRegression>);
105 factory.registerFunction("stochasticLogisticRegression", createAggregateFunctionMLMethod<FuncLogisticRegression>);
106}
107
108LinearModelData::LinearModelData(
109 Float64 learning_rate_,
110 Float64 l2_reg_coef_,
111 UInt64 param_num_,
112 UInt64 batch_capacity_,
113 std::shared_ptr<DB::IGradientComputer> gradient_computer_,
114 std::shared_ptr<DB::IWeightsUpdater> weights_updater_)
115 : learning_rate(learning_rate_)
116 , l2_reg_coef(l2_reg_coef_)
117 , batch_capacity(batch_capacity_)
118 , batch_size(0)
119 , gradient_computer(std::move(gradient_computer_))
120 , weights_updater(std::move(weights_updater_))
121{
122 weights.resize(param_num_, Float64{0.0});
123 gradient_batch.resize(param_num_ + 1, Float64{0.0});
124}
125
126void LinearModelData::update_state()
127{
128 if (batch_size == 0)
129 return;
130
131 weights_updater->update(batch_size, weights, bias, learning_rate, gradient_batch);
132 batch_size = 0;
133 ++iter_num;
134 gradient_batch.assign(gradient_batch.size(), Float64{0.0});
135}
136
137void LinearModelData::predict(
138 ColumnVector<Float64>::Container & container,
139 Block & block,
140 size_t offset,
141 size_t limit,
142 const ColumnNumbers & arguments,
143 const Context & context) const
144{
145 gradient_computer->predict(container, block, offset, limit, arguments, weights, bias, context);
146}
147
148void LinearModelData::returnWeights(IColumn & to) const
149{
150 size_t size = weights.size() + 1;
151
152 ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
153 ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
154
155 size_t old_size = offsets_to.back();
156 offsets_to.push_back(old_size + size);
157
158 typename ColumnFloat64::Container & val_to
159 = assert_cast<ColumnFloat64 &>(arr_to.getData()).getData();
160
161 val_to.reserve(old_size + size);
162 for (size_t i = 0; i + 1 < size; ++i)
163 val_to.push_back(weights[i]);
164
165 val_to.push_back(bias);
166}
167
168void LinearModelData::read(ReadBuffer & buf)
169{
170 readBinary(bias, buf);
171 readBinary(weights, buf);
172 readBinary(iter_num, buf);
173 readBinary(gradient_batch, buf);
174 readBinary(batch_size, buf);
175 weights_updater->read(buf);
176}
177
178void LinearModelData::write(WriteBuffer & buf) const
179{
180 writeBinary(bias, buf);
181 writeBinary(weights, buf);
182 writeBinary(iter_num, buf);
183 writeBinary(gradient_batch, buf);
184 writeBinary(batch_size, buf);
185 weights_updater->write(buf);
186}
187
188void LinearModelData::merge(const DB::LinearModelData & rhs)
189{
190 if (iter_num == 0 && rhs.iter_num == 0)
191 return;
192
193 update_state();
194 /// can't update rhs state because it's constant
195
196 /// squared mean is more stable (in sence of quality of prediction) when two states with quietly different number of learning steps are merged
197 Float64 frac = (static_cast<Float64>(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num);
198
199 for (size_t i = 0; i < weights.size(); ++i)
200 {
201 weights[i] = weights[i] * frac + rhs.weights[i] * (1 - frac);
202 }
203 bias = bias * frac + rhs.bias * (1 - frac);
204
205 iter_num += rhs.iter_num;
206 weights_updater->merge(*rhs.weights_updater, frac, 1 - frac);
207}
208
209void LinearModelData::add(const IColumn ** columns, size_t row_num)
210{
211 /// first column stores target; features start from (columns + 1)
212 Float64 target = (*columns[0]).getFloat64(row_num);
213
214 /// Here we have columns + 1 as first column corresponds to target value, and others - to features
215 weights_updater->add_to_batch(
216 gradient_batch, *gradient_computer, weights, bias, l2_reg_coef, target, columns + 1, row_num);
217
218 ++batch_size;
219 if (batch_size == batch_capacity)
220 {
221 update_state();
222 }
223}
224
225/// Weights updaters
226
227void Adam::write(WriteBuffer & buf) const
228{
229 writeBinary(average_gradient, buf);
230 writeBinary(average_squared_gradient, buf);
231}
232
233void Adam::read(ReadBuffer & buf)
234{
235 readBinary(average_gradient, buf);
236 readBinary(average_squared_gradient, buf);
237}
238
239void Adam::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
240{
241 auto & adam_rhs = static_cast<const Adam &>(rhs);
242
243 if (adam_rhs.average_gradient.empty())
244 return;
245
246 if (average_gradient.empty())
247 {
248 if (!average_squared_gradient.empty() ||
249 adam_rhs.average_gradient.size() != adam_rhs.average_squared_gradient.size())
250 throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
251
252 average_gradient.resize(adam_rhs.average_gradient.size(), Float64{0.0});
253 average_squared_gradient.resize(adam_rhs.average_squared_gradient.size(), Float64{0.0});
254 }
255
256 for (size_t i = 0; i < average_gradient.size(); ++i)
257 {
258 average_gradient[i] = average_gradient[i] * frac + adam_rhs.average_gradient[i] * rhs_frac;
259 average_squared_gradient[i] = average_squared_gradient[i] * frac + adam_rhs.average_squared_gradient[i] * rhs_frac;
260 }
261 beta1_powered_ *= adam_rhs.beta1_powered_;
262 beta2_powered_ *= adam_rhs.beta2_powered_;
263}
264
265void Adam::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
266{
267 if (average_gradient.empty())
268 {
269 if (!average_squared_gradient.empty())
270 throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
271
272 average_gradient.resize(batch_gradient.size(), Float64{0.0});
273 average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
274 }
275
276 for (size_t i = 0; i != average_gradient.size(); ++i)
277 {
278 Float64 normed_gradient = batch_gradient[i] / batch_size;
279 average_gradient[i] = beta1_ * average_gradient[i] + (1 - beta1_) * normed_gradient;
280 average_squared_gradient[i] = beta2_ * average_squared_gradient[i] +
281 (1 - beta2_) * normed_gradient * normed_gradient;
282 }
283
284 for (size_t i = 0; i < weights.size(); ++i)
285 {
286 weights[i] += (learning_rate * average_gradient[i]) /
287 ((1 - beta1_powered_) * (sqrt(average_squared_gradient[i] / (1 - beta2_powered_)) + eps_));
288 }
289 bias += (learning_rate * average_gradient[weights.size()]) /
290 ((1 - beta1_powered_) * (sqrt(average_squared_gradient[weights.size()] / (1 - beta2_powered_)) + eps_));
291
292 beta1_powered_ *= beta1_;
293 beta2_powered_ *= beta2_;
294}
295
296void Adam::add_to_batch(
297 std::vector<Float64> & batch_gradient,
298 IGradientComputer & gradient_computer,
299 const std::vector<Float64> & weights,
300 Float64 bias,
301 Float64 l2_reg_coef,
302 Float64 target,
303 const IColumn ** columns,
304 size_t row_num)
305{
306 if (average_gradient.empty())
307 {
308 average_gradient.resize(batch_gradient.size(), Float64{0.0});
309 average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
310 }
311 gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
312}
313
314void Nesterov::read(ReadBuffer & buf)
315{
316 readBinary(accumulated_gradient, buf);
317}
318
319void Nesterov::write(WriteBuffer & buf) const
320{
321 writeBinary(accumulated_gradient, buf);
322}
323
324void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
325{
326 auto & nesterov_rhs = static_cast<const Nesterov &>(rhs);
327 if (accumulated_gradient.empty())
328 accumulated_gradient.resize(nesterov_rhs.accumulated_gradient.size(), Float64{0.0});
329
330 for (size_t i = 0; i < accumulated_gradient.size(); ++i)
331 {
332 accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac;
333 }
334}
335
336void Nesterov::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
337{
338 if (accumulated_gradient.empty())
339 {
340 accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
341 }
342
343 for (size_t i = 0; i < batch_gradient.size(); ++i)
344 {
345 accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
346 }
347 for (size_t i = 0; i < weights.size(); ++i)
348 {
349 weights[i] += accumulated_gradient[i];
350 }
351 bias += accumulated_gradient[weights.size()];
352}
353
354void Nesterov::add_to_batch(
355 std::vector<Float64> & batch_gradient,
356 IGradientComputer & gradient_computer,
357 const std::vector<Float64> & weights,
358 Float64 bias,
359 Float64 l2_reg_coef,
360 Float64 target,
361 const IColumn ** columns,
362 size_t row_num)
363{
364 if (accumulated_gradient.empty())
365 {
366 accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
367 }
368
369 std::vector<Float64> shifted_weights(weights.size());
370 for (size_t i = 0; i != shifted_weights.size(); ++i)
371 {
372 shifted_weights[i] = weights[i] + accumulated_gradient[i] * alpha_;
373 }
374 auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_;
375
376 gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, l2_reg_coef, target, columns, row_num);
377}
378
379void Momentum::read(ReadBuffer & buf)
380{
381 readBinary(accumulated_gradient, buf);
382}
383
384void Momentum::write(WriteBuffer & buf) const
385{
386 writeBinary(accumulated_gradient, buf);
387}
388
389void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
390{
391 auto & momentum_rhs = static_cast<const Momentum &>(rhs);
392 for (size_t i = 0; i < accumulated_gradient.size(); ++i)
393 {
394 accumulated_gradient[i] = accumulated_gradient[i] * frac + momentum_rhs.accumulated_gradient[i] * rhs_frac;
395 }
396}
397
398void Momentum::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
399{
400 /// batch_size is already checked to be greater than 0
401 if (accumulated_gradient.empty())
402 {
403 accumulated_gradient.resize(batch_gradient.size(), Float64{0.0});
404 }
405
406 for (size_t i = 0; i < batch_gradient.size(); ++i)
407 {
408 accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
409 }
410 for (size_t i = 0; i < weights.size(); ++i)
411 {
412 weights[i] += accumulated_gradient[i];
413 }
414 bias += accumulated_gradient[weights.size()];
415}
416
417void StochasticGradientDescent::update(
418 UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
419{
420 /// batch_size is already checked to be greater than 0
421 for (size_t i = 0; i < weights.size(); ++i)
422 {
423 weights[i] += (learning_rate * batch_gradient[i]) / batch_size;
424 }
425 bias += (learning_rate * batch_gradient[weights.size()]) / batch_size;
426}
427
428void IWeightsUpdater::add_to_batch(
429 std::vector<Float64> & batch_gradient,
430 IGradientComputer & gradient_computer,
431 const std::vector<Float64> & weights,
432 Float64 bias,
433 Float64 l2_reg_coef,
434 Float64 target,
435 const IColumn ** columns,
436 size_t row_num)
437{
438 gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
439}
440
441/// Gradient computers
442
443void LogisticRegression::predict(
444 ColumnVector<Float64>::Container & container,
445 Block & block,
446 size_t offset,
447 size_t limit,
448 const ColumnNumbers & arguments,
449 const std::vector<Float64> & weights,
450 Float64 bias,
451 const Context & /*context*/) const
452{
453 size_t rows_num = block.rows();
454
455 if (offset > rows_num || offset + limit > rows_num)
456 throw Exception("Invalid offset and limit for LogisticRegression::predict. "
457 "Block has " + toString(rows_num) + " rows, but offset is " + toString(offset) +
458 " and limit is " + toString(limit), ErrorCodes::LOGICAL_ERROR);
459
460 std::vector<Float64> results(limit, bias);
461
462 for (size_t i = 1; i < arguments.size(); ++i)
463 {
464 const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
465
466 if (!isNativeNumber(cur_col.type))
467 throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
468
469 auto & features_column = cur_col.column;
470
471 for (size_t row_num = 0; row_num < limit; ++row_num)
472 results[row_num] += weights[i - 1] * features_column->getFloat64(offset + row_num);
473 }
474
475 container.reserve(container.size() + limit);
476 for (size_t row_num = 0; row_num < limit; ++row_num)
477 container.emplace_back(1 / (1 + exp(-results[row_num])));
478}
479
480void LogisticRegression::compute(
481 std::vector<Float64> & batch_gradient,
482 const std::vector<Float64> & weights,
483 Float64 bias,
484 Float64 l2_reg_coef,
485 Float64 target,
486 const IColumn ** columns,
487 size_t row_num)
488{
489 Float64 derivative = bias;
490 for (size_t i = 0; i < weights.size(); ++i)
491 {
492 auto value = (*columns[i]).getFloat64(row_num);
493 derivative += weights[i] * value;
494 }
495 derivative *= target;
496 derivative = exp(derivative);
497
498 batch_gradient[weights.size()] += target / (derivative + 1);
499 for (size_t i = 0; i < weights.size(); ++i)
500 {
501 auto value = (*columns[i]).getFloat64(row_num);
502 batch_gradient[i] += target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i];
503 }
504}
505
506void LinearRegression::predict(
507 ColumnVector<Float64>::Container & container,
508 Block & block,
509 size_t offset,
510 size_t limit,
511 const ColumnNumbers & arguments,
512 const std::vector<Float64> & weights,
513 Float64 bias,
514 const Context & /*context*/) const
515{
516 if (weights.size() + 1 != arguments.size())
517 {
518 throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR);
519 }
520
521 size_t rows_num = block.rows();
522
523 if (offset > rows_num || offset + limit > rows_num)
524 throw Exception("Invalid offset and limit for LogisticRegression::predict. "
525 "Block has " + toString(rows_num) + " rows, but offset is " + toString(offset) +
526 " and limit is " + toString(limit), ErrorCodes::LOGICAL_ERROR);
527
528 std::vector<Float64> results(limit, bias);
529
530 for (size_t i = 1; i < arguments.size(); ++i)
531 {
532 const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
533
534 if (!isNativeNumber(cur_col.type))
535 throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
536
537 auto features_column = cur_col.column;
538
539 if (!features_column)
540 throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR);
541
542 for (size_t row_num = 0; row_num < limit; ++row_num)
543 results[row_num] += weights[i - 1] * features_column->getFloat64(row_num + offset);
544 }
545
546 container.reserve(container.size() + limit);
547 for (size_t row_num = 0; row_num < limit; ++row_num)
548 container.emplace_back(results[row_num]);
549}
550
551void LinearRegression::compute(
552 std::vector<Float64> & batch_gradient,
553 const std::vector<Float64> & weights,
554 Float64 bias,
555 Float64 l2_reg_coef,
556 Float64 target,
557 const IColumn ** columns,
558 size_t row_num)
559{
560 Float64 derivative = (target - bias);
561 for (size_t i = 0; i < weights.size(); ++i)
562 {
563 auto value = (*columns[i]).getFloat64(row_num);
564 derivative -= weights[i] * value;
565 }
566 derivative *= 2;
567
568 batch_gradient[weights.size()] += derivative;
569 for (size_t i = 0; i < weights.size(); ++i)
570 {
571 auto value = (*columns[i]).getFloat64(row_num);
572 batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i];
573 }
574}
575
576}
577