1#pragma once
2
3#include <AggregateFunctions/ReservoirSampler.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int NOT_IMPLEMENTED;
12}
13
14/** Quantile calculation with "reservoir sample" algorithm.
15 * It collects pseudorandom subset of limited size from a stream of values,
16 * and approximate quantile from it.
17 * The result is non-deterministic. Also look at QuantileReservoirSamplerDeterministic.
18 *
19 * This algorithm is quite inefficient in terms of precision for memory usage,
20 * but very efficient in CPU (though less efficient than QuantileTiming and than QuantileExact for small sets).
21 */
22template <typename Value>
23struct QuantileReservoirSampler
24{
25 using Data = ReservoirSampler<Value, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO>;
26 Data data;
27
28 void add(const Value & x)
29 {
30 data.insert(x);
31 }
32
33 template <typename Weight>
34 void add(const Value &, const Weight &)
35 {
36 throw Exception("Method add with weight is not implemented for ReservoirSampler", ErrorCodes::NOT_IMPLEMENTED);
37 }
38
39 void merge(const QuantileReservoirSampler & rhs)
40 {
41 data.merge(rhs.data);
42 }
43
44 void serialize(WriteBuffer & buf) const
45 {
46 data.write(buf);
47 }
48
49 void deserialize(ReadBuffer & buf)
50 {
51 data.read(buf);
52 }
53
54 /// Get the value of the `level` quantile. The level must be between 0 and 1.
55 Value get(Float64 level)
56 {
57 return data.quantileInterpolated(level);
58 }
59
60 /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
61 /// indices - an array of index levels such that the corresponding elements will go in ascending order.
62 void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result)
63 {
64 for (size_t i = 0; i < size; ++i)
65 result[indices[i]] = data.quantileInterpolated(levels[indices[i]]);
66 }
67
68 /// The same, but in the case of an empty state, NaN is returned.
69 Float64 getFloat(Float64 level)
70 {
71 return data.quantileInterpolated(level);
72 }
73
74 void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
75 {
76 for (size_t i = 0; i < size; ++i)
77 result[indices[i]] = data.quantileInterpolated(levels[indices[i]]);
78 }
79};
80
81}
82