1#pragma once
2
3#include <Common/Arena.h>
4#include <Common/NaNUtils.h>
5
6#include <Columns/ColumnVector.h>
7#include <Columns/ColumnTuple.h>
8#include <Columns/ColumnArray.h>
9#include <Common/assert_cast.h>
10
11#include <DataTypes/DataTypesNumber.h>
12#include <DataTypes/DataTypeArray.h>
13#include <DataTypes/DataTypeTuple.h>
14
15#include <IO/WriteBuffer.h>
16#include <IO/ReadBuffer.h>
17#include <IO/WriteHelpers.h>
18#include <IO/ReadHelpers.h>
19#include <IO/VarInt.h>
20
21#include <AggregateFunctions/IAggregateFunction.h>
22
23#include <math.h>
24#include <queue>
25#include <stddef.h>
26
27namespace DB
28{
29
30namespace ErrorCodes
31{
32 extern const int TOO_LARGE_ARRAY_SIZE;
33 extern const int INCORRECT_DATA;
34}
35
36/**
37 * distance compression algorigthm implementation
38 * http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
39 */
40class AggregateFunctionHistogramData
41{
42public:
43 using Mean = Float64;
44 using Weight = Float64;
45
46 constexpr static size_t bins_count_limit = 250;
47
48private:
49 struct WeightedValue
50 {
51 Mean mean;
52 Weight weight;
53
54 WeightedValue operator+ (const WeightedValue & other)
55 {
56 return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
57 }
58 };
59
60private:
61 // quantity of stored weighted-values
62 UInt32 size;
63
64 // calculated lower and upper bounds of seen points
65 Mean lower_bound;
66 Mean upper_bound;
67
68 // Weighted values representation of histogram.
69 WeightedValue points[0];
70
71private:
72 void sort()
73 {
74 std::sort(points, points + size,
75 [](const WeightedValue & first, const WeightedValue & second)
76 {
77 return first.mean < second.mean;
78 });
79 }
80
81 template <typename T>
82 struct PriorityQueueStorage
83 {
84 size_t size = 0;
85 T * data_ptr;
86
87 PriorityQueueStorage(T * value)
88 : data_ptr(value)
89 {
90 }
91
92 void push_back(T val)
93 {
94 data_ptr[size] = std::move(val);
95 ++size;
96 }
97
98 void pop_back() { --size; }
99 T * begin() { return data_ptr; }
100 T * end() const { return data_ptr + size; }
101 bool empty() const { return size == 0; }
102 T & front() { return *data_ptr; }
103 const T & front() const { return *data_ptr; }
104
105 using value_type = T;
106 using reference = T&;
107 using const_reference = const T&;
108 using size_type = size_t;
109 };
110
111 /**
112 * Repeatedly fuse most close values until max_bins bins left
113 */
114 void compress(UInt32 max_bins)
115 {
116 sort();
117 auto new_size = size;
118 if (size <= max_bins)
119 return;
120
121 // Maintain doubly-linked list of "active" points
122 // and store neighbour pairs in priority queue by distance
123 UInt32 previous[size + 1];
124 UInt32 next[size + 1];
125 bool active[size + 1];
126 std::fill(active, active + size, true);
127 active[size] = false;
128
129 auto delete_node = [&](UInt32 i)
130 {
131 previous[next[i]] = previous[i];
132 next[previous[i]] = next[i];
133 active[i] = false;
134 };
135
136 for (size_t i = 0; i <= size; ++i)
137 {
138 previous[i] = i - 1;
139 next[i] = i + 1;
140 }
141
142 next[size] = 0;
143 previous[0] = size;
144
145 using QueueItem = std::pair<Mean, UInt32>;
146
147 QueueItem storage[2 * size - max_bins];
148
149 std::priority_queue<
150 QueueItem,
151 PriorityQueueStorage<QueueItem>,
152 std::greater<QueueItem>>
153 queue{std::greater<QueueItem>(),
154 PriorityQueueStorage<QueueItem>(storage)};
155
156 auto quality = [&](UInt32 i) { return points[next[i]].mean - points[i].mean; };
157
158 for (size_t i = 0; i + 1 < size; ++i)
159 queue.push({quality(i), i});
160
161 while (new_size > max_bins && !queue.empty())
162 {
163 auto min_item = queue.top();
164 queue.pop();
165 auto left = min_item.second;
166 auto right = next[left];
167
168 if (!active[left] || !active[right] || quality(left) > min_item.first)
169 continue;
170
171 points[left] = points[left] + points[right];
172
173 delete_node(right);
174 if (active[next[left]])
175 queue.push({quality(left), left});
176 if (active[previous[left]])
177 queue.push({quality(previous[left]), previous[left]});
178
179 --new_size;
180 }
181
182 size_t left = 0;
183 for (size_t right = 0; right < size; ++right)
184 {
185 if (active[right])
186 {
187 points[left] = points[right];
188 ++left;
189 }
190 }
191 size = new_size;
192 }
193
194 /***
195 * Delete too close points from histogram.
196 * Assumes that points are sorted.
197 */
198 void unique()
199 {
200 if (size == 0)
201 return;
202
203 size_t left = 0;
204
205 for (auto right = left + 1; right < size; ++right)
206 {
207 // Fuse points if their text representations differ only in last digit
208 auto min_diff = 10 * (points[left].mean + points[right].mean) * std::numeric_limits<Mean>::epsilon();
209 if (points[left].mean + min_diff >= points[right].mean)
210 {
211 points[left] = points[left] + points[right];
212 }
213 else
214 {
215 ++left;
216 points[left] = points[right];
217 }
218 }
219 size = left + 1;
220 }
221
222public:
223 AggregateFunctionHistogramData()
224 : size(0)
225 , lower_bound(std::numeric_limits<Mean>::max())
226 , upper_bound(std::numeric_limits<Mean>::lowest())
227 {
228 static_assert(offsetof(AggregateFunctionHistogramData, points) == sizeof(AggregateFunctionHistogramData), "points should be last member");
229 }
230
231 static size_t structSize(size_t max_bins)
232 {
233 return sizeof(AggregateFunctionHistogramData) + max_bins * 2 * sizeof(WeightedValue);
234 }
235
236 void insertResultInto(ColumnVector<Mean> & to_lower, ColumnVector<Mean> & to_upper, ColumnVector<Weight> & to_weights, UInt32 max_bins)
237 {
238 compress(max_bins);
239 unique();
240
241 for (size_t i = 0; i < size; ++i)
242 {
243 to_lower.insertValue((i == 0) ? lower_bound : (points[i].mean + points[i - 1].mean) / 2);
244 to_upper.insertValue((i + 1 == size) ? upper_bound : (points[i].mean + points[i + 1].mean) / 2);
245
246 // linear density approximation
247 Weight lower_weight = (i == 0) ? points[i].weight : ((points[i - 1].weight) + points[i].weight * 3) / 4;
248 Weight upper_weight = (i + 1 == size) ? points[i].weight : (points[i + 1].weight + points[i].weight * 3) / 4;
249 to_weights.insertValue((lower_weight + upper_weight) / 2);
250 }
251 }
252
253 void add(Mean value, Weight weight, UInt32 max_bins)
254 {
255 // nans break sort and compression
256 // infs don't fit in bins partition method
257 if (!isFinite(value))
258 throw Exception("Invalid value (inf or nan) for aggregation by 'histogram' function", ErrorCodes::INCORRECT_DATA);
259
260 points[size] = {value, weight};
261 ++size;
262 lower_bound = std::min(lower_bound, value);
263 upper_bound = std::max(upper_bound, value);
264
265 if (size >= max_bins * 2)
266 compress(max_bins);
267 }
268
269 void merge(const AggregateFunctionHistogramData & other, UInt32 max_bins)
270 {
271 lower_bound = std::min(lower_bound, other.lower_bound);
272 upper_bound = std::max(upper_bound, other.upper_bound);
273 for (size_t i = 0; i < other.size; i++)
274 add(other.points[i].mean, other.points[i].weight, max_bins);
275 }
276
277 void write(WriteBuffer & buf) const
278 {
279 writeBinary(lower_bound, buf);
280 writeBinary(upper_bound, buf);
281
282 writeVarUInt(size, buf);
283 buf.write(reinterpret_cast<const char *>(points), size * sizeof(WeightedValue));
284 }
285
286 void read(ReadBuffer & buf, UInt32 max_bins)
287 {
288 readBinary(lower_bound, buf);
289 readBinary(upper_bound, buf);
290
291 readVarUInt(size, buf);
292 if (size > max_bins * 2)
293 throw Exception("Too many bins", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
294
295 buf.read(reinterpret_cast<char *>(points), size * sizeof(WeightedValue));
296 }
297};
298
299template <typename T>
300class AggregateFunctionHistogram final: public IAggregateFunctionDataHelper<AggregateFunctionHistogramData, AggregateFunctionHistogram<T>>
301{
302private:
303 using Data = AggregateFunctionHistogramData;
304
305 const UInt32 max_bins;
306
307public:
308 AggregateFunctionHistogram(UInt32 max_bins_, const DataTypes & arguments, const Array & params)
309 : IAggregateFunctionDataHelper<AggregateFunctionHistogramData, AggregateFunctionHistogram<T>>(arguments, params)
310 , max_bins(max_bins_)
311 {
312 }
313
314 size_t sizeOfData() const override
315 {
316 return Data::structSize(max_bins);
317 }
318 DataTypePtr getReturnType() const override
319 {
320 DataTypes types;
321 auto mean = std::make_shared<DataTypeNumber<Data::Mean>>();
322 auto weight = std::make_shared<DataTypeNumber<Data::Weight>>();
323
324 // lower bound
325 types.emplace_back(mean);
326 // upper bound
327 types.emplace_back(mean);
328 // weight
329 types.emplace_back(weight);
330
331 auto tuple = std::make_shared<DataTypeTuple>(types);
332 return std::make_shared<DataTypeArray>(tuple);
333 }
334
335 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
336 {
337 auto val = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
338 this->data(place).add(static_cast<Data::Mean>(val), 1, max_bins);
339 }
340
341 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
342 {
343 this->data(place).merge(this->data(rhs), max_bins);
344 }
345
346 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
347 {
348 this->data(place).write(buf);
349 }
350
351 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
352 {
353 this->data(place).read(buf, max_bins);
354 }
355
356 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
357 {
358 auto & data = this->data(const_cast<AggregateDataPtr>(place));
359
360 auto & to_array = assert_cast<ColumnArray &>(to);
361 ColumnArray::Offsets & offsets_to = to_array.getOffsets();
362 auto & to_tuple = assert_cast<ColumnTuple &>(to_array.getData());
363
364 auto & to_lower = assert_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(0));
365 auto & to_upper = assert_cast<ColumnVector<Data::Mean> &>(to_tuple.getColumn(1));
366 auto & to_weights = assert_cast<ColumnVector<Data::Weight> &>(to_tuple.getColumn(2));
367 data.insertResultInto(to_lower, to_upper, to_weights, max_bins);
368
369 offsets_to.push_back(to_tuple.size());
370 }
371
372 String getName() const override { return "histogram"; }
373};
374
375}
376