1#pragma once
2
3#include <IO/WriteHelpers.h>
4#include <IO/ReadHelpers.h>
5
6#include <DataTypes/DataTypeArray.h>
7#include <DataTypes/DataTypesNumber.h>
8#include <DataTypes/DataTypesDecimal.h>
9
10#include <Columns/ColumnVector.h>
11#include <Columns/ColumnArray.h>
12
13#include <Common/ArenaAllocator.h>
14#include <Common/assert_cast.h>
15
16#include <AggregateFunctions/IAggregateFunction.h>
17
18#include <type_traits>
19
20#define AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE 0xFFFFFF
21
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28 extern const int TOO_LARGE_ARRAY_SIZE;
29 extern const int LOGICAL_ERROR;
30}
31
32
33template <typename T>
34struct MovingSumData
35{
36 // Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
37 using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
38 using Array = PODArray<T, 32, Allocator>;
39
40 Array value;
41 Array window;
42 T sum = 0;
43
44 void add(T val, Arena * arena)
45 {
46 sum += val;
47
48 value.push_back(sum, arena);
49 }
50
51 T get(size_t idx, UInt64 win_size) const
52 {
53 if (idx < win_size)
54 return value[idx];
55 else
56 return value[idx] - value[idx - win_size];
57 }
58
59};
60
61template <typename T>
62struct MovingAvgData
63{
64 // Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
65 using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
66 using Array = PODArray<T, 32, Allocator>;
67
68 Array value;
69 Array window;
70 T sum = 0;
71
72 void add(T val, Arena * arena)
73 {
74 sum += val;
75
76 value.push_back(sum, arena);
77 }
78
79 T get(size_t idx, UInt64 win_size) const
80 {
81 if (idx < win_size)
82 return value[idx] / win_size;
83 else
84 return (value[idx] - value[idx - win_size]) / win_size;
85 }
86
87};
88
89
90
91template <typename T, typename Tlimit_num_elems, typename Data>
92class MovingImpl final
93 : public IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>
94{
95 static constexpr bool limit_num_elems = Tlimit_num_elems::value;
96 DataTypePtr & data_type;
97 UInt64 win_size;
98
99public:
100 using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
101 using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; // probably for overflow function in the future
102
103 explicit MovingImpl(const DataTypePtr & data_type_, UInt64 win_size_ = std::numeric_limits<UInt64>::max())
104 : IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>({data_type_}, {})
105 , data_type(this->argument_types[0]), win_size(win_size_) {}
106
107 String getName() const override { return "movingXXX"; }
108
109 DataTypePtr getReturnType() const override
110 {
111 return std::make_shared<DataTypeArray>(data_type);
112 }
113
114 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
115 {
116 auto val = static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
117
118 this->data(place).add(val, arena);
119 }
120
121 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
122 {
123 auto & cur_elems = this->data(place);
124 auto & rhs_elems = this->data(rhs);
125
126 size_t cur_size = cur_elems.value.size();
127
128 if (rhs_elems.value.size())
129 cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
130
131 for (size_t i = cur_size; i < cur_elems.value.size(); ++i)
132 {
133 cur_elems.value[i] += cur_elems.sum;
134 }
135
136 cur_elems.sum += rhs_elems.sum;
137 }
138
139 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
140 {
141 const auto & value = this->data(place).value;
142 size_t size = value.size();
143 writeVarUInt(size, buf);
144 buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
145 }
146
147 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
148 {
149 size_t size = 0;
150 readVarUInt(size, buf);
151
152 if (unlikely(size > AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE))
153 throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
154
155 auto & value = this->data(place).value;
156
157 value.resize(size, arena);
158 buf.read(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
159
160 this->data(place).sum = value.back();
161 }
162
163 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
164 {
165 const auto & data = this->data(place);
166 size_t size = data.value.size();
167
168 ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
169 ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
170
171 offsets_to.push_back(offsets_to.back() + size);
172
173 if (size)
174 {
175 typename ColVecResult::Container & data_to = static_cast<ColVecResult &>(arr_to.getData()).getData();
176
177 for (size_t i = 0; i < size; ++i)
178 {
179 if (!limit_num_elems)
180 {
181 data_to.push_back(data.get(i, size));
182 }
183 else
184 {
185 data_to.push_back(data.get(i, win_size));
186 }
187 }
188 }
189 }
190
191 bool allocatesMemoryInArena() const override
192 {
193 return true;
194 }
195};
196
197#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE
198
199}
200