1#pragma once
2
3#include <bitset>
4#include <iostream>
5#include <map>
6#include <queue>
7#include <sstream>
8#include <unordered_set>
9#include <utility>
10#include <Columns/ColumnArray.h>
11#include <Columns/ColumnTuple.h>
12#include <Columns/ColumnsNumber.h>
13#include <DataTypes/DataTypeArray.h>
14#include <DataTypes/DataTypeTuple.h>
15#include <DataTypes/DataTypesNumber.h>
16#include <IO/ReadHelpers.h>
17#include <IO/WriteHelpers.h>
18#include <Common/ArenaAllocator.h>
19#include <Common/assert_cast.h>
20#include <ext/range.h>
21#include "IAggregateFunction.h"
22
23
24namespace DB
25{
26namespace ErrorCodes
27{
28 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
29 extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
30}
31template <bool rate>
32struct AggregateFunctionTimeSeriesGroupSumData
33{
34 using DataPoint = std::pair<Int64, Float64>;
35 struct Points
36 {
37 using Dps = std::queue<DataPoint>;
38 Dps dps;
39 void add(Int64 t, Float64 v)
40 {
41 dps.push(std::make_pair(t, v));
42 if (dps.size() > 2)
43 dps.pop();
44 }
45 Float64 getval(Int64 t)
46 {
47 Int64 t1, t2;
48 Float64 v1, v2;
49 if (rate)
50 {
51 if (dps.size() < 2)
52 return 0;
53 t1 = dps.back().first;
54 t2 = dps.front().first;
55 v1 = dps.back().second;
56 v2 = dps.front().second;
57 return (v1 - v2) / Float64(t1 - t2);
58 }
59 else
60 {
61 if (dps.size() == 1 && t == dps.front().first)
62 return dps.front().second;
63 t1 = dps.back().first;
64 t2 = dps.front().first;
65 v1 = dps.back().second;
66 v2 = dps.front().second;
67 return v2 + ((v1 - v2) * Float64(t - t2)) / Float64(t1 - t2);
68 }
69 }
70 };
71
72 typedef std::map<UInt64, Points> Series;
73 typedef PODArrayWithStackMemory<DataPoint, 128> AggSeries;
74 Series ss;
75 AggSeries result;
76
77 void add(UInt64 uid, Int64 t, Float64 v)
78 { //suppose t is coming asc
79 typename Series::iterator it_ss;
80 if (ss.count(uid) == 0)
81 { //time series not exist, insert new one
82 Points tmp;
83 tmp.add(t, v);
84 ss.emplace(uid, tmp);
85 it_ss = ss.find(uid);
86 }
87 else
88 {
89 it_ss = ss.find(uid);
90 it_ss->second.add(t, v);
91 }
92 if (result.size() > 0 && t < result.back().first)
93 throw Exception{"timeSeriesGroupSum or timeSeriesGroupRateSum must order by timestamp asc!!!", ErrorCodes::LOGICAL_ERROR};
94 if (result.size() > 0 && t == result.back().first)
95 {
96 //do not add new point
97 if (rate)
98 result.back().second += it_ss->second.getval(t);
99 else
100 result.back().second += v;
101 }
102 else
103 {
104 if (rate)
105 result.emplace_back(std::make_pair(t, it_ss->second.getval(t)));
106 else
107 result.emplace_back(std::make_pair(t, v));
108 }
109 size_t i = result.size() - 1;
110 //reverse find out the index of timestamp that more than previous timestamp of t
111 while (result[i].first > it_ss->second.dps.front().first && i >= 0)
112 i--;
113
114 i++;
115 while (i < result.size() - 1)
116 {
117 result[i].second += it_ss->second.getval(result[i].first);
118 i++;
119 }
120 }
121
122 void merge(const AggregateFunctionTimeSeriesGroupSumData & other)
123 {
124 //if ts has overlap, then aggregate two series by interpolation;
125 AggSeries tmp;
126 tmp.reserve(other.result.size() + result.size());
127 size_t i = 0, j = 0;
128 Int64 t1, t2;
129 Float64 v1, v2;
130 while (i < result.size() && j < other.result.size())
131 {
132 if (result[i].first < other.result[j].first)
133 {
134 if (j == 0)
135 {
136 tmp.emplace_back(result[i]);
137 }
138 else
139 {
140 t1 = other.result[j].first;
141 t2 = other.result[j - 1].first;
142 v1 = other.result[j].second;
143 v2 = other.result[j - 1].second;
144 Float64 value = result[i].second + v2 + (v1 - v2) * (Float64(result[i].first - t2)) / Float64(t1 - t2);
145 tmp.emplace_back(std::make_pair(result[i].first, value));
146 }
147 i++;
148 }
149 else if (result[i].first > other.result[j].first)
150 {
151 if (i == 0)
152 {
153 tmp.emplace_back(other.result[j]);
154 }
155 else
156 {
157 t1 = result[i].first;
158 t2 = result[i - 1].first;
159 v1 = result[i].second;
160 v2 = result[i - 1].second;
161 Float64 value = other.result[j].second + v2 + (v1 - v2) * (Float64(other.result[j].first - t2)) / Float64(t1 - t2);
162 tmp.emplace_back(std::make_pair(other.result[j].first, value));
163 }
164 j++;
165 }
166 else
167 {
168 tmp.emplace_back(std::make_pair(result[i].first, result[i].second + other.result[j].second));
169 i++;
170 j++;
171 }
172 }
173 while (i < result.size())
174 {
175 tmp.emplace_back(result[i]);
176 i++;
177 }
178 while (j < other.result.size())
179 {
180 tmp.push_back(other.result[j]);
181 j++;
182 }
183 swap(result, tmp);
184 }
185
186 void serialize(WriteBuffer & buf) const
187 {
188 size_t size = result.size();
189 writeVarUInt(size, buf);
190 buf.write(reinterpret_cast<const char *>(result.data()), sizeof(result[0]));
191 }
192
193 void deserialize(ReadBuffer & buf)
194 {
195 size_t size = 0;
196 readVarUInt(size, buf);
197 result.resize(size);
198 buf.read(reinterpret_cast<char *>(result.data()), size * sizeof(result[0]));
199 }
200};
201template <bool rate>
202class AggregateFunctionTimeSeriesGroupSum final
203 : public IAggregateFunctionDataHelper<AggregateFunctionTimeSeriesGroupSumData<rate>, AggregateFunctionTimeSeriesGroupSum<rate>>
204{
205private:
206public:
207 String getName() const override { return rate ? "timeSeriesGroupRateSum" : "timeSeriesGroupSum"; }
208
209 AggregateFunctionTimeSeriesGroupSum(const DataTypes & arguments)
210 : IAggregateFunctionDataHelper<AggregateFunctionTimeSeriesGroupSumData<rate>, AggregateFunctionTimeSeriesGroupSum<rate>>(arguments, {})
211 {
212 if (!WhichDataType(arguments[0].get()).isUInt64())
213 throw Exception{"Illegal type " + arguments[0].get()->getName() + " of argument 1 of aggregate function " + getName()
214 + ", must be UInt64",
215 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
216
217 if (!WhichDataType(arguments[1].get()).isInt64())
218 throw Exception{"Illegal type " + arguments[1].get()->getName() + " of argument 2 of aggregate function " + getName()
219 + ", must be Int64",
220 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
221
222 if (!WhichDataType(arguments[2].get()).isFloat64())
223 throw Exception{"Illegal type " + arguments[2].get()->getName() + " of argument 3 of aggregate function " + getName()
224 + ", must be Float64",
225 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
226 }
227
228 DataTypePtr getReturnType() const override
229 {
230 auto datatypes = std::vector<DataTypePtr>();
231 datatypes.push_back(std::make_shared<DataTypeInt64>());
232 datatypes.push_back(std::make_shared<DataTypeFloat64>());
233
234 return std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(datatypes));
235 }
236
237 void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
238 {
239 auto uid = assert_cast<const ColumnVector<UInt64> *>(columns[0])->getData()[row_num];
240 auto ts = assert_cast<const ColumnVector<Int64> *>(columns[1])->getData()[row_num];
241 auto val = assert_cast<const ColumnVector<Float64> *>(columns[2])->getData()[row_num];
242 if (uid && ts && val)
243 {
244 this->data(place).add(uid, ts, val);
245 }
246 }
247
248 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); }
249
250 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).serialize(buf); }
251
252 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); }
253
254 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
255 {
256 const auto & value = this->data(place).result;
257 size_t size = value.size();
258
259 ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
260 ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
261 size_t old_size = offsets_to.back();
262
263 offsets_to.push_back(offsets_to.back() + size);
264
265 if (size)
266 {
267 typename ColumnInt64::Container & ts_to
268 = assert_cast<ColumnInt64 &>(assert_cast<ColumnTuple &>(arr_to.getData()).getColumn(0)).getData();
269 typename ColumnFloat64::Container & val_to
270 = assert_cast<ColumnFloat64 &>(assert_cast<ColumnTuple &>(arr_to.getData()).getColumn(1)).getData();
271 ts_to.reserve(old_size + size);
272 val_to.reserve(old_size + size);
273 size_t i = 0;
274 while (i < this->data(place).result.size())
275 {
276 ts_to.push_back(this->data(place).result[i].first);
277 val_to.push_back(this->data(place).result[i].second);
278 i++;
279 }
280 }
281 }
282
283 bool allocatesMemoryInArena() const override { return true; }
284};
285}
286