1 | #pragma once |
2 | |
3 | #include <AggregateFunctions/ReservoirSampler.h> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int NOT_IMPLEMENTED; |
12 | } |
13 | |
14 | /** Quantile calculation with "reservoir sample" algorithm. |
15 | * It collects pseudorandom subset of limited size from a stream of values, |
16 | * and approximate quantile from it. |
17 | * The result is non-deterministic. Also look at QuantileReservoirSamplerDeterministic. |
18 | * |
19 | * This algorithm is quite inefficient in terms of precision for memory usage, |
20 | * but very efficient in CPU (though less efficient than QuantileTiming and than QuantileExact for small sets). |
21 | */ |
22 | template <typename Value> |
23 | struct QuantileReservoirSampler |
24 | { |
25 | using Data = ReservoirSampler<Value, ReservoirSamplerOnEmpty::RETURN_NAN_OR_ZERO>; |
26 | Data data; |
27 | |
28 | void add(const Value & x) |
29 | { |
30 | data.insert(x); |
31 | } |
32 | |
33 | template <typename Weight> |
34 | void add(const Value &, const Weight &) |
35 | { |
36 | throw Exception("Method add with weight is not implemented for ReservoirSampler" , ErrorCodes::NOT_IMPLEMENTED); |
37 | } |
38 | |
39 | void merge(const QuantileReservoirSampler & rhs) |
40 | { |
41 | data.merge(rhs.data); |
42 | } |
43 | |
44 | void serialize(WriteBuffer & buf) const |
45 | { |
46 | data.write(buf); |
47 | } |
48 | |
49 | void deserialize(ReadBuffer & buf) |
50 | { |
51 | data.read(buf); |
52 | } |
53 | |
54 | /// Get the value of the `level` quantile. The level must be between 0 and 1. |
55 | Value get(Float64 level) |
56 | { |
57 | return data.quantileInterpolated(level); |
58 | } |
59 | |
60 | /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address. |
61 | /// indices - an array of index levels such that the corresponding elements will go in ascending order. |
62 | void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result) |
63 | { |
64 | for (size_t i = 0; i < size; ++i) |
65 | result[indices[i]] = data.quantileInterpolated(levels[indices[i]]); |
66 | } |
67 | |
68 | /// The same, but in the case of an empty state, NaN is returned. |
69 | Float64 getFloat(Float64 level) |
70 | { |
71 | return data.quantileInterpolated(level); |
72 | } |
73 | |
74 | void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result) |
75 | { |
76 | for (size_t i = 0; i < size; ++i) |
77 | result[indices[i]] = data.quantileInterpolated(levels[indices[i]]); |
78 | } |
79 | }; |
80 | |
81 | } |
82 | |