1#pragma once
2
3#include <limits>
4#include <algorithm>
5#include <climits>
6#include <sstream>
7#include <common/Types.h>
8#include <IO/ReadBuffer.h>
9#include <IO/ReadHelpers.h>
10#include <IO/WriteHelpers.h>
11#include <Common/PODArray.h>
12#include <Common/NaNUtils.h>
13#include <Poco/Exception.h>
14#include <pcg_random.hpp>
15
16
17/// Implementing the Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the sample_count size.
18/// Can approximately get quantiles.
19/// Call `quantile` takes O(sample_count log sample_count), if after the previous call `quantile` there was at least one call `insert`. Otherwise O(1).
20/// That is, it makes sense to first add, then get quantiles without adding.
21
22const size_t DEFAULT_SAMPLE_COUNT = 8192;
23
24/// What if there is not a single value - throw an exception, or return 0 or NaN in the case of double?
25namespace ReservoirSamplerOnEmpty
26{
27 enum Enum
28 {
29 THROW,
30 RETURN_NAN_OR_ZERO,
31 };
32}
33
34template <typename ResultType, bool is_float>
35struct NanLikeValueConstructor
36{
37 static ResultType getValue()
38 {
39 return std::numeric_limits<ResultType>::quiet_NaN();
40 }
41};
42template <typename ResultType>
43struct NanLikeValueConstructor<ResultType, false>
44{
45 static ResultType getValue()
46 {
47 return ResultType();
48 }
49};
50
51template <typename T, ReservoirSamplerOnEmpty::Enum OnEmpty = ReservoirSamplerOnEmpty::THROW, typename Comparer = std::less<T>>
52class ReservoirSampler
53{
54public:
55 ReservoirSampler(size_t sample_count_ = DEFAULT_SAMPLE_COUNT)
56 : sample_count(sample_count_)
57 {
58 rng.seed(123456);
59 }
60
61 void clear()
62 {
63 samples.clear();
64 sorted = false;
65 total_values = 0;
66 rng.seed(123456);
67 }
68
69 void insert(const T & v)
70 {
71 if (isNaN(v))
72 return;
73
74 sorted = false;
75 ++total_values;
76 if (samples.size() < sample_count)
77 {
78 samples.push_back(v);
79 }
80 else
81 {
82 UInt64 rnd = genRandom(total_values);
83 if (rnd < sample_count)
84 samples[rnd] = v;
85 }
86 }
87
88 size_t size() const
89 {
90 return total_values;
91 }
92
93 T quantileNearest(double level)
94 {
95 if (samples.empty())
96 return onEmpty<T>();
97
98 sortIfNeeded();
99
100 double index = level * (samples.size() - 1);
101 size_t int_index = static_cast<size_t>(index + 0.5);
102 int_index = std::max(0LU, std::min(samples.size() - 1, int_index));
103 return samples[int_index];
104 }
105
106 /** If T is not a numeric type, using this method causes a compilation error,
107 * but use of error class does not. SFINAE.
108 */
109 double quantileInterpolated(double level)
110 {
111 if (samples.empty())
112 {
113 if (DB::IsDecimalNumber<T>)
114 return 0;
115 return onEmpty<double>();
116 }
117 sortIfNeeded();
118
119 double index = std::max(0., std::min(samples.size() - 1., level * (samples.size() - 1)));
120
121 /// To get the value of a fractional index, we linearly interpolate between neighboring values.
122 size_t left_index = static_cast<size_t>(index);
123 size_t right_index = left_index + 1;
124 if (right_index == samples.size())
125 return samples[left_index];
126
127 double left_coef = right_index - index;
128 double right_coef = index - left_index;
129
130 return samples[left_index] * left_coef + samples[right_index] * right_coef;
131 }
132
133 void merge(const ReservoirSampler<T, OnEmpty> & b)
134 {
135 if (sample_count != b.sample_count)
136 throw Poco::Exception("Cannot merge ReservoirSampler's with different sample_count");
137 sorted = false;
138
139 if (b.total_values <= sample_count)
140 {
141 for (size_t i = 0; i < b.samples.size(); ++i)
142 insert(b.samples[i]);
143 }
144 else if (total_values <= sample_count)
145 {
146 Array from = std::move(samples);
147 samples.assign(b.samples.begin(), b.samples.end());
148 total_values = b.total_values;
149 for (size_t i = 0; i < from.size(); ++i)
150 insert(from[i]);
151 }
152 else
153 {
154 randomShuffle(samples);
155 total_values += b.total_values;
156 for (size_t i = 0; i < sample_count; ++i)
157 {
158 UInt64 rnd = genRandom(total_values);
159 if (rnd < b.total_values)
160 samples[i] = b.samples[i];
161 }
162 }
163 }
164
165 void read(DB::ReadBuffer & buf)
166 {
167 DB::readIntBinary<size_t>(sample_count, buf);
168 DB::readIntBinary<size_t>(total_values, buf);
169 samples.resize(std::min(total_values, sample_count));
170
171 std::string rng_string;
172 DB::readStringBinary(rng_string, buf);
173 std::istringstream rng_stream(rng_string);
174 rng_stream >> rng;
175
176 for (size_t i = 0; i < samples.size(); ++i)
177 DB::readBinary(samples[i], buf);
178
179 sorted = false;
180 }
181
182 void write(DB::WriteBuffer & buf) const
183 {
184 DB::writeIntBinary<size_t>(sample_count, buf);
185 DB::writeIntBinary<size_t>(total_values, buf);
186
187 std::ostringstream rng_stream;
188 rng_stream << rng;
189 DB::writeStringBinary(rng_stream.str(), buf);
190
191 for (size_t i = 0; i < std::min(sample_count, total_values); ++i)
192 DB::writeBinary(samples[i], buf);
193 }
194
195private:
196 friend void qdigest_test(int normal_size, UInt64 value_limit, const std::vector<UInt64> & values, int queries_count, bool verbose);
197 friend void rs_perf_test();
198
199 /// We allocate a little memory on the stack - to avoid allocations when there are many objects with a small number of elements.
200 using Array = DB::PODArrayWithStackMemory<T, 64>;
201
202 size_t sample_count;
203 size_t total_values = 0;
204 Array samples;
205 pcg32_fast rng;
206 bool sorted = false;
207
208
209 UInt64 genRandom(size_t lim)
210 {
211 /// With a large number of values, we will generate random numbers several times slower.
212 if (lim <= static_cast<UInt64>(rng.max()))
213 return static_cast<UInt32>(rng()) % static_cast<UInt32>(lim);
214 else
215 return (static_cast<UInt64>(rng()) * (static_cast<UInt64>(rng.max()) + 1ULL) + static_cast<UInt64>(rng())) % lim;
216 }
217
218 void randomShuffle(Array & v)
219 {
220 for (size_t i = 1; i < v.size(); ++i)
221 {
222 size_t j = genRandom(i + 1);
223 std::swap(v[i], v[j]);
224 }
225 }
226
227 void sortIfNeeded()
228 {
229 if (sorted)
230 return;
231 sorted = true;
232 std::sort(samples.begin(), samples.end(), Comparer());
233 }
234
235 template <typename ResultType>
236 ResultType onEmpty() const
237 {
238 if (OnEmpty == ReservoirSamplerOnEmpty::THROW)
239 throw Poco::Exception("Quantile of empty ReservoirSampler");
240 else
241 return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
242 }
243};
244