1#pragma once
2
3#include <Functions/FunctionHelpers.h>
4#include <IO/WriteHelpers.h>
5#include <DataTypes/getLeastSupertype.h>
6#include <DataTypes/DataTypeArray.h>
7#include <DataTypes/DataTypesNumber.h>
8#include <DataTypes/DataTypesDecimal.h>
9#include <DataTypes/DataTypeDateTime64.h>
10#include <Columns/ColumnVector.h>
11#include <Interpreters/castColumn.h>
12#include "IFunctionImpl.h"
13#include <Common/intExp.h>
14#include <Common/assert_cast.h>
15#include <Core/Defines.h>
16#include <cmath>
17#include <type_traits>
18#include <array>
19#include <ext/bit_cast.h>
20#include <algorithm>
21
22#ifdef __SSE4_1__
23 #include <smmintrin.h>
24#endif
25
26
27namespace DB
28{
29
30namespace ErrorCodes
31{
32 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
33 extern const int ILLEGAL_TYPE_OF_ARGUMENT;
34 extern const int ILLEGAL_COLUMN;
35 extern const int LOGICAL_ERROR;
36 extern const int BAD_ARGUMENTS;
37}
38
39
40/** Rounding Functions:
41 * round(x, N) - rounding to nearest (N = 0 by default). Use banker's rounding for floating point numbers.
42 * roundBankers(x, N) - rounding to nearest (N = 0 by default). Use banker's rounding for all numbers.
43 * floor(x, N) is the largest number <= x (N = 0 by default).
44 * ceil(x, N) is the smallest number >= x (N = 0 by default).
45 * trunc(x, N) - is the largest by absolute value number that is not greater than x by absolute value (N = 0 by default).
46 *
47 * The value of the parameter N (scale):
48 * - N > 0: round to the number with N decimal places after the decimal point
49 * - N < 0: round to an integer with N zero characters
50 * - N = 0: round to an integer
51 *
52 * Type of the result is the type of argument.
53 * For integer arguments, when passing negative scale, overflow can occur.
54 * In that case, the behavior is implementation specific.
55 */
56
57
58/** This parameter controls the behavior of the rounding functions.
59 */
60enum class ScaleMode
61{
62 Positive, // round to a number with N decimal places after the decimal point
63 Negative, // round to an integer with N zero characters
64 Zero, // round to an integer
65};
66
67enum class RoundingMode
68{
69#ifdef __SSE4_1__
70 Round = _MM_FROUND_TO_NEAREST_INT | _MM_FROUND_NO_EXC,
71 Floor = _MM_FROUND_TO_NEG_INF | _MM_FROUND_NO_EXC,
72 Ceil = _MM_FROUND_TO_POS_INF | _MM_FROUND_NO_EXC,
73 Trunc = _MM_FROUND_TO_ZERO | _MM_FROUND_NO_EXC,
74#else
75 Round = 8, /// Values are correspond to above just in case.
76 Floor = 9,
77 Ceil = 10,
78 Trunc = 11,
79#endif
80};
81
82enum class TieBreakingMode
83{
84 Auto, // use banker's rounding for floating point numbers, round up otherwise
85 Bankers, // use banker's rounding
86};
87
88
89/** Rounding functions for integer values.
90 */
91template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode, TieBreakingMode tie_breaking_mode>
92struct IntegerRoundingComputation
93{
94 static const size_t data_count = 1;
95
96 static size_t prepare(size_t scale)
97 {
98 return scale;
99 }
100
101 static ALWAYS_INLINE T computeImpl(T x, T scale)
102 {
103 switch (rounding_mode)
104 {
105 case RoundingMode::Trunc:
106 {
107 return x / scale * scale;
108 }
109 case RoundingMode::Floor:
110 {
111 if (x < 0)
112 x -= scale - 1;
113 return x / scale * scale;
114 }
115 case RoundingMode::Ceil:
116 {
117 if (x >= 0)
118 x += scale - 1;
119 return x / scale * scale;
120 }
121 case RoundingMode::Round:
122 {
123 if (x < 0)
124 x -= scale;
125 switch (tie_breaking_mode)
126 {
127 case TieBreakingMode::Auto:
128 x = (x + scale / 2) / scale * scale;
129 break;
130 case TieBreakingMode::Bankers:
131 {
132 T quotient = (x + scale / 2) / scale;
133 if (quotient * scale == x + scale / 2)
134 // round half to even
135 x = ((quotient + (x < 0)) & ~1) * scale;
136 else
137 // round the others as usual
138 x = quotient * scale;
139 break;
140 }
141 }
142 return x;
143 }
144 }
145
146 __builtin_unreachable();
147 }
148
149 static ALWAYS_INLINE T compute(T x, T scale)
150 {
151 switch (scale_mode)
152 {
153 case ScaleMode::Zero:
154 return x;
155 case ScaleMode::Positive:
156 return x;
157 case ScaleMode::Negative:
158 return computeImpl(x, scale);
159 }
160
161 __builtin_unreachable();
162 }
163
164 static ALWAYS_INLINE void compute(const T * __restrict in, size_t scale, T * __restrict out)
165 {
166 if (sizeof(T) <= sizeof(scale) && scale > size_t(std::numeric_limits<T>::max()))
167 *out = 0;
168 else
169 *out = compute(*in, scale);
170 }
171
172};
173
174
175#ifdef __SSE4_1__
176
177template <typename T>
178class BaseFloatRoundingComputation;
179
180template <>
181class BaseFloatRoundingComputation<Float32>
182{
183public:
184 using ScalarType = Float32;
185 using VectorType = __m128;
186 static const size_t data_count = 4;
187
188 static VectorType load(const ScalarType * in) { return _mm_loadu_ps(in); }
189 static VectorType load1(const ScalarType in) { return _mm_load1_ps(&in); }
190 static void store(ScalarType * out, VectorType val) { _mm_storeu_ps(out, val);}
191 static VectorType multiply(VectorType val, VectorType scale) { return _mm_mul_ps(val, scale); }
192 static VectorType divide(VectorType val, VectorType scale) { return _mm_div_ps(val, scale); }
193 template <RoundingMode mode> static VectorType apply(VectorType val) { return _mm_round_ps(val, int(mode)); }
194
195 static VectorType prepare(size_t scale)
196 {
197 return load1(scale);
198 }
199};
200
201template <>
202class BaseFloatRoundingComputation<Float64>
203{
204public:
205 using ScalarType = Float64;
206 using VectorType = __m128d;
207 static const size_t data_count = 2;
208
209 static VectorType load(const ScalarType * in) { return _mm_loadu_pd(in); }
210 static VectorType load1(const ScalarType in) { return _mm_load1_pd(&in); }
211 static void store(ScalarType * out, VectorType val) { _mm_storeu_pd(out, val);}
212 static VectorType multiply(VectorType val, VectorType scale) { return _mm_mul_pd(val, scale); }
213 static VectorType divide(VectorType val, VectorType scale) { return _mm_div_pd(val, scale); }
214 template <RoundingMode mode> static VectorType apply(VectorType val) { return _mm_round_pd(val, int(mode)); }
215
216 static VectorType prepare(size_t scale)
217 {
218 return load1(scale);
219 }
220};
221
222#else
223
224/// Implementation for ARM. Not vectorized.
225
226inline float roundWithMode(float x, RoundingMode mode)
227{
228 switch (mode)
229 {
230 case RoundingMode::Round: return roundf(x);
231 case RoundingMode::Floor: return floorf(x);
232 case RoundingMode::Ceil: return ceilf(x);
233 case RoundingMode::Trunc: return truncf(x);
234 }
235
236 __builtin_unreachable();
237}
238
239inline double roundWithMode(double x, RoundingMode mode)
240{
241 switch (mode)
242 {
243 case RoundingMode::Round: return round(x);
244 case RoundingMode::Floor: return floor(x);
245 case RoundingMode::Ceil: return ceil(x);
246 case RoundingMode::Trunc: return trunc(x);
247 }
248
249 __builtin_unreachable();
250}
251
252template <typename T>
253class BaseFloatRoundingComputation
254{
255public:
256 using ScalarType = T;
257 using VectorType = T;
258 static const size_t data_count = 1;
259
260 static VectorType load(const ScalarType * in) { return *in; }
261 static VectorType load1(const ScalarType in) { return in; }
262 static VectorType store(ScalarType * out, ScalarType val) { return *out = val;}
263 static VectorType multiply(VectorType val, VectorType scale) { return val * scale; }
264 static VectorType divide(VectorType val, VectorType scale) { return val / scale; }
265 template <RoundingMode mode> static VectorType apply(VectorType val) { return roundWithMode(val, mode); }
266
267 static VectorType prepare(size_t scale)
268 {
269 return load1(scale);
270 }
271};
272
273#endif
274
275
276/** Implementation of low-level round-off functions for floating-point values.
277 */
278template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
279class FloatRoundingComputation : public BaseFloatRoundingComputation<T>
280{
281 using Base = BaseFloatRoundingComputation<T>;
282
283public:
284 static inline void compute(const T * __restrict in, const typename Base::VectorType & scale, T * __restrict out)
285 {
286 auto val = Base::load(in);
287
288 if (scale_mode == ScaleMode::Positive)
289 val = Base::multiply(val, scale);
290 else if (scale_mode == ScaleMode::Negative)
291 val = Base::divide(val, scale);
292
293 val = Base::template apply<rounding_mode>(val);
294
295 if (scale_mode == ScaleMode::Positive)
296 val = Base::divide(val, scale);
297 else if (scale_mode == ScaleMode::Negative)
298 val = Base::multiply(val, scale);
299
300 Base::store(out, val);
301 }
302};
303
304
305/** Implementing high-level rounding functions.
306 */
307template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
308struct FloatRoundingImpl
309{
310private:
311 using Op = FloatRoundingComputation<T, rounding_mode, scale_mode>;
312 using Data = std::array<T, Op::data_count>;
313
314public:
315 static NO_INLINE void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container & out)
316 {
317 auto mm_scale = Op::prepare(scale);
318
319 const size_t data_count = std::tuple_size<Data>();
320
321 const T* end_in = in.data() + in.size();
322 const T* limit = in.data() + in.size() / data_count * data_count;
323
324 const T* __restrict p_in = in.data();
325 T* __restrict p_out = out.data();
326
327 while (p_in < limit)
328 {
329 Op::compute(p_in, mm_scale, p_out);
330 p_in += data_count;
331 p_out += data_count;
332 }
333
334 if (p_in < end_in)
335 {
336 Data tmp_src{{}};
337 Data tmp_dst;
338
339 size_t tail_size_bytes = (end_in - p_in) * sizeof(*p_in);
340
341 memcpy(&tmp_src, p_in, tail_size_bytes);
342 Op::compute(reinterpret_cast<T *>(&tmp_src), mm_scale, reinterpret_cast<T *>(&tmp_dst));
343 memcpy(p_out, &tmp_dst, tail_size_bytes);
344 }
345 }
346};
347
348template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode, TieBreakingMode tie_breaking_mode>
349struct IntegerRoundingImpl
350{
351private:
352 using Op = IntegerRoundingComputation<T, rounding_mode, scale_mode, tie_breaking_mode>;
353
354public:
355 template <size_t scale>
356 static NO_INLINE void applyImpl(const PaddedPODArray<T> & in, typename ColumnVector<T>::Container & out)
357 {
358 const T * end_in = in.data() + in.size();
359
360 const T * __restrict p_in = in.data();
361 T * __restrict p_out = out.data();
362
363 while (p_in < end_in)
364 {
365 Op::compute(p_in, scale, p_out);
366 ++p_in;
367 ++p_out;
368 }
369 }
370
371 static NO_INLINE void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container & out)
372 {
373 /// Manual function cloning for compiler to generate integer division by constant.
374 switch (scale)
375 {
376 case 1ULL: return applyImpl<1ULL>(in, out);
377 case 10ULL: return applyImpl<10ULL>(in, out);
378 case 100ULL: return applyImpl<100ULL>(in, out);
379 case 1000ULL: return applyImpl<1000ULL>(in, out);
380 case 10000ULL: return applyImpl<10000ULL>(in, out);
381 case 100000ULL: return applyImpl<100000ULL>(in, out);
382 case 1000000ULL: return applyImpl<1000000ULL>(in, out);
383 case 10000000ULL: return applyImpl<10000000ULL>(in, out);
384 case 100000000ULL: return applyImpl<100000000ULL>(in, out);
385 case 1000000000ULL: return applyImpl<1000000000ULL>(in, out);
386 case 10000000000ULL: return applyImpl<10000000000ULL>(in, out);
387 case 100000000000ULL: return applyImpl<100000000000ULL>(in, out);
388 case 1000000000000ULL: return applyImpl<1000000000000ULL>(in, out);
389 case 10000000000000ULL: return applyImpl<10000000000000ULL>(in, out);
390 case 100000000000000ULL: return applyImpl<100000000000000ULL>(in, out);
391 case 1000000000000000ULL: return applyImpl<1000000000000000ULL>(in, out);
392 case 10000000000000000ULL: return applyImpl<10000000000000000ULL>(in, out);
393 case 100000000000000000ULL: return applyImpl<100000000000000000ULL>(in, out);
394 case 1000000000000000000ULL: return applyImpl<1000000000000000000ULL>(in, out);
395 case 10000000000000000000ULL: return applyImpl<10000000000000000000ULL>(in, out);
396 default:
397 throw Exception("Logical error: unexpected 'scale' parameter passed to function IntegerRoundingComputation::compute",
398 ErrorCodes::LOGICAL_ERROR);
399 }
400 }
401};
402
403
404template <typename T, RoundingMode rounding_mode, TieBreakingMode tie_breaking_mode>
405class DecimalRoundingImpl
406{
407private:
408 using NativeType = typename T::NativeType;
409 using Op = IntegerRoundingComputation<NativeType, rounding_mode, ScaleMode::Negative, tie_breaking_mode>;
410 using Container = typename ColumnDecimal<T>::Container;
411
412public:
413 static NO_INLINE void apply(const Container & in, Container & out, Int64 scale_arg)
414 {
415 scale_arg = in.getScale() - scale_arg;
416 if (scale_arg > 0)
417 {
418 size_t scale = intExp10(scale_arg);
419
420 const NativeType * __restrict p_in = reinterpret_cast<const NativeType *>(in.data());
421 const NativeType * end_in = reinterpret_cast<const NativeType *>(in.data()) + in.size();
422 NativeType * __restrict p_out = reinterpret_cast<NativeType *>(out.data());
423
424 while (p_in < end_in)
425 {
426 Op::compute(p_in, scale, p_out);
427 ++p_in;
428 ++p_out;
429 }
430 }
431 else
432 memcpy(out.data(), in.data(), in.size() * sizeof(T));
433 }
434};
435
436
437/** Select the appropriate processing algorithm depending on the scale.
438 */
439template <typename T, RoundingMode rounding_mode, TieBreakingMode tie_breaking_mode>
440class Dispatcher
441{
442 template <ScaleMode scale_mode>
443 using FunctionRoundingImpl = std::conditional_t<std::is_floating_point_v<T>,
444 FloatRoundingImpl<T, rounding_mode, scale_mode>,
445 IntegerRoundingImpl<T, rounding_mode, scale_mode, tie_breaking_mode>>;
446
447 static void apply(Block & block, const ColumnVector<T> * col, Int64 scale_arg, size_t result)
448 {
449 auto col_res = ColumnVector<T>::create();
450
451 typename ColumnVector<T>::Container & vec_res = col_res->getData();
452 vec_res.resize(col->getData().size());
453
454 if (!vec_res.empty())
455 {
456 if (scale_arg == 0)
457 {
458 size_t scale = 1;
459 FunctionRoundingImpl<ScaleMode::Zero>::apply(col->getData(), scale, vec_res);
460 }
461 else if (scale_arg > 0)
462 {
463 size_t scale = intExp10(scale_arg);
464 FunctionRoundingImpl<ScaleMode::Positive>::apply(col->getData(), scale, vec_res);
465 }
466 else
467 {
468 size_t scale = intExp10(-scale_arg);
469 FunctionRoundingImpl<ScaleMode::Negative>::apply(col->getData(), scale, vec_res);
470 }
471 }
472
473 block.getByPosition(result).column = std::move(col_res);
474 }
475
476 static void apply(Block & block, const ColumnDecimal<T> * col, Int64 scale_arg, size_t result)
477 {
478 const typename ColumnDecimal<T>::Container & vec_src = col->getData();
479
480 auto col_res = ColumnDecimal<T>::create(vec_src.size(), vec_src.getScale());
481 auto & vec_res = col_res->getData();
482
483 if (!vec_res.empty())
484 DecimalRoundingImpl<T, rounding_mode, tie_breaking_mode>::apply(col->getData(), vec_res, scale_arg);
485
486 block.getByPosition(result).column = std::move(col_res);
487 }
488
489public:
490 static void apply(Block & block, const IColumn * column, Int64 scale_arg, size_t result)
491 {
492 if constexpr (IsNumber<T>)
493 apply(block, checkAndGetColumn<ColumnVector<T>>(column), scale_arg, result);
494 else if constexpr (IsDecimalNumber<T>)
495 apply(block, checkAndGetColumn<ColumnDecimal<T>>(column), scale_arg, result);
496 }
497};
498
499/** A template for functions that round the value of an input parameter of type
500 * (U)Int8/16/32/64, Float32/64 or Decimal32/64/128, and accept an additional optional parameter (default is 0).
501 */
502template <typename Name, RoundingMode rounding_mode, TieBreakingMode tie_breaking_mode>
503class FunctionRounding : public IFunction
504{
505public:
506 static constexpr auto name = Name::name;
507 static FunctionPtr create(const Context &) { return std::make_shared<FunctionRounding>(); }
508
509public:
510 String getName() const override
511 {
512 return name;
513 }
514
515 bool isVariadic() const override { return true; }
516 size_t getNumberOfArguments() const override { return 0; }
517
518 /// Get result types by argument types. If the function does not apply to these arguments, throw an exception.
519 DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
520 {
521 if ((arguments.size() < 1) || (arguments.size() > 2))
522 throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
523 + toString(arguments.size()) + ", should be 1 or 2.",
524 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
525
526 for (const auto & type : arguments)
527 if (!isNumber(type))
528 throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
529 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
530
531 return arguments[0];
532 }
533
534 static Int64 getScaleArg(Block & block, const ColumnNumbers & arguments)
535 {
536 if (arguments.size() == 2)
537 {
538 const IColumn & scale_column = *block.getByPosition(arguments[1]).column;
539 if (!isColumnConst(scale_column))
540 throw Exception("Scale argument for rounding functions must be constant.", ErrorCodes::ILLEGAL_COLUMN);
541
542 Field scale_field = assert_cast<const ColumnConst &>(scale_column).getField();
543 if (scale_field.getType() != Field::Types::UInt64
544 && scale_field.getType() != Field::Types::Int64)
545 throw Exception("Scale argument for rounding functions must have integer type.", ErrorCodes::ILLEGAL_COLUMN);
546
547 return scale_field.get<Int64>();
548 }
549 return 0;
550 }
551
552 bool useDefaultImplementationForConstants() const override { return true; }
553 ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
554
555 void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
556 {
557 const ColumnWithTypeAndName & column = block.getByPosition(arguments[0]);
558 Int64 scale_arg = getScaleArg(block, arguments);
559
560 auto call = [&](const auto & types) -> bool
561 {
562 using Types = std::decay_t<decltype(types)>;
563 using DataType = typename Types::LeftType;
564
565 if constexpr (IsDataTypeNumber<DataType> || IsDataTypeDecimal<DataType>)
566 {
567 using FieldType = typename DataType::FieldType;
568 Dispatcher<FieldType, rounding_mode, tie_breaking_mode>::apply(block, column.column.get(), scale_arg, result);
569 return true;
570 }
571 return false;
572 };
573
574 if (!callOnIndexAndDataType<void>(column.type->getTypeId(), call))
575 {
576 throw Exception("Illegal column " + column.name + " of argument of function " + getName(),
577 ErrorCodes::ILLEGAL_COLUMN);
578 }
579 }
580
581 bool hasInformationAboutMonotonicity() const override
582 {
583 return true;
584 }
585
586 Monotonicity getMonotonicityForRange(const IDataType &, const Field &, const Field &) const override
587 {
588 return { true, true, true };
589 }
590};
591
592
593/** Rounds down to a number within explicitly specified array.
594 * If the value is less than the minimal bound - returns the minimal bound.
595 */
596class FunctionRoundDown : public IFunction
597{
598public:
599 static constexpr auto name = "roundDown";
600 static FunctionPtr create(const Context & context) { return std::make_shared<FunctionRoundDown>(context); }
601 FunctionRoundDown(const Context & context_) : context(context_) {}
602
603public:
604 String getName() const override { return name; }
605
606 bool isVariadic() const override { return false; }
607 size_t getNumberOfArguments() const override { return 2; }
608 bool useDefaultImplementationForConstants() const override { return true; }
609 ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
610
611 DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
612 {
613 const DataTypePtr & type_x = arguments[0];
614
615 if (!isNumber(type_x))
616 throw Exception{"Unsupported type " + type_x->getName()
617 + " of first argument of function " + getName()
618 + ", must be numeric type.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
619
620 const DataTypeArray * type_arr = checkAndGetDataType<DataTypeArray>(arguments[1].get());
621
622 if (!type_arr)
623 throw Exception{"Second argument of function " + getName()
624 + ", must be array of boundaries to round to.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
625
626 const auto type_arr_nested = type_arr->getNestedType();
627
628 if (!isNumber(type_arr_nested))
629 {
630 throw Exception{"Elements of array of second argument of function " + getName()
631 + " must be numeric type.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
632 }
633 return getLeastSupertype({type_x, type_arr_nested});
634 }
635
636 void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t) override
637 {
638 auto in_column = block.getByPosition(arguments[0]).column;
639 const auto & in_type = block.getByPosition(arguments[0]).type;
640
641 auto array_column = block.getByPosition(arguments[1]).column;
642 const auto & array_type = block.getByPosition(arguments[1]).type;
643
644 const auto & return_type = block.getByPosition(result).type;
645 auto column_result = return_type->createColumn();
646 auto out = column_result.get();
647
648 if (!in_type->equals(*return_type))
649 in_column = castColumn(block.getByPosition(arguments[0]), return_type, context);
650
651 if (!array_type->equals(*return_type))
652 array_column = castColumn(block.getByPosition(arguments[1]), std::make_shared<DataTypeArray>(return_type), context);
653
654 const auto in = in_column.get();
655 auto boundaries = typeid_cast<const ColumnConst &>(*array_column).getValue<Array>();
656 size_t num_boundaries = boundaries.size();
657 if (!num_boundaries)
658 throw Exception("Empty array is illegal for boundaries in " + getName() + " function", ErrorCodes::BAD_ARGUMENTS);
659
660 if (!executeNum<UInt8>(in, out, boundaries)
661 && !executeNum<UInt16>(in, out, boundaries)
662 && !executeNum<UInt32>(in, out, boundaries)
663 && !executeNum<UInt64>(in, out, boundaries)
664 && !executeNum<Int8>(in, out, boundaries)
665 && !executeNum<Int16>(in, out, boundaries)
666 && !executeNum<Int32>(in, out, boundaries)
667 && !executeNum<Int64>(in, out, boundaries)
668 && !executeNum<Float32>(in, out, boundaries)
669 && !executeNum<Float64>(in, out, boundaries)
670 && !executeDecimal<Decimal32>(in, out, boundaries)
671 && !executeDecimal<Decimal64>(in, out, boundaries)
672 && !executeDecimal<Decimal128>(in, out, boundaries))
673 {
674 throw Exception{"Illegal column " + in->getName() + " of first argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN};
675 }
676
677 block.getByPosition(result).column = std::move(column_result);
678 }
679
680private:
681 template <typename T>
682 bool executeNum(const IColumn * in_untyped, IColumn * out_untyped, const Array & boundaries)
683 {
684 const auto in = checkAndGetColumn<ColumnVector<T>>(in_untyped);
685 auto out = typeid_cast<ColumnVector<T> *>(out_untyped);
686 if (!in || !out)
687 return false;
688
689 executeImplNumToNum(in->getData(), out->getData(), boundaries);
690 return true;
691 }
692
693 template <typename T>
694 bool executeDecimal(const IColumn * in_untyped, IColumn * out_untyped, const Array & boundaries)
695 {
696 const auto in = checkAndGetColumn<ColumnDecimal<T>>(in_untyped);
697 auto out = typeid_cast<ColumnDecimal<T> *>(out_untyped);
698 if (!in || !out)
699 return false;
700
701 executeImplNumToNum(in->getData(), out->getData(), boundaries);
702 return true;
703 }
704
705 template <typename Container>
706 void NO_INLINE executeImplNumToNum(const Container & src, Container & dst, const Array & boundaries)
707 {
708 using ValueType = typename Container::value_type;
709 std::vector<ValueType> boundary_values(boundaries.size());
710 for (size_t i = 0; i < boundaries.size(); ++i)
711 boundary_values[i] = boundaries[i].get<ValueType>();
712
713 std::sort(boundary_values.begin(), boundary_values.end());
714 boundary_values.erase(std::unique(boundary_values.begin(), boundary_values.end()), boundary_values.end());
715
716 size_t size = src.size();
717 dst.resize(size);
718
719 if (boundary_values.size() < 32) /// Just a guess
720 {
721 /// Linear search with value on previous iteration as a hint.
722 /// Not optimal if the size of list is large and distribution of values is uniform random.
723
724 auto begin = boundary_values.begin();
725 auto end = boundary_values.end();
726 auto it = begin + (end - begin) / 2;
727
728 for (size_t i = 0; i < size; ++i)
729 {
730 auto value = src[i];
731
732 if (*it < value)
733 {
734 while (it != end && *it <= value)
735 ++it;
736 if (it != begin)
737 --it;
738 }
739 else
740 {
741 while (*it > value && it != begin)
742 --it;
743 }
744
745 dst[i] = *it;
746 }
747 }
748 else
749 {
750 for (size_t i = 0; i < size; ++i)
751 {
752 auto it = std::upper_bound(boundary_values.begin(), boundary_values.end(), src[i]);
753 if (it == boundary_values.end())
754 {
755 dst[i] = boundary_values.back();
756 }
757 else if (it == boundary_values.begin())
758 {
759 dst[i] = boundary_values.front();
760 }
761 else
762 {
763 dst[i] = *(it - 1);
764 }
765 }
766 }
767 }
768
769private:
770 const Context & context;
771};
772
773
774struct NameRound { static constexpr auto name = "round"; };
775struct NameRoundBankers { static constexpr auto name = "roundBankers"; };
776struct NameCeil { static constexpr auto name = "ceil"; };
777struct NameFloor { static constexpr auto name = "floor"; };
778struct NameTrunc { static constexpr auto name = "trunc"; };
779
780using FunctionRound = FunctionRounding<NameRound, RoundingMode::Round, TieBreakingMode::Auto>;
781using FunctionRoundBankers = FunctionRounding<NameRoundBankers, RoundingMode::Round, TieBreakingMode::Bankers>;
782using FunctionFloor = FunctionRounding<NameFloor, RoundingMode::Floor, TieBreakingMode::Auto>;
783using FunctionCeil = FunctionRounding<NameCeil, RoundingMode::Ceil, TieBreakingMode::Auto>;
784using FunctionTrunc = FunctionRounding<NameTrunc, RoundingMode::Trunc, TieBreakingMode::Auto>;
785
786}
787