1 | #pragma once |
2 | |
3 | #include <limits> |
4 | #include <algorithm> |
5 | #include <climits> |
6 | #include <sstream> |
7 | #include <common/Types.h> |
8 | #include <IO/ReadBuffer.h> |
9 | #include <IO/ReadHelpers.h> |
10 | #include <IO/WriteHelpers.h> |
11 | #include <Common/PODArray.h> |
12 | #include <Common/NaNUtils.h> |
13 | #include <Poco/Exception.h> |
14 | #include <pcg_random.hpp> |
15 | |
16 | |
17 | /// Implementing the Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the sample_count size. |
18 | /// Can approximately get quantiles. |
19 | /// Call `quantile` takes O(sample_count log sample_count), if after the previous call `quantile` there was at least one call `insert`. Otherwise O(1). |
20 | /// That is, it makes sense to first add, then get quantiles without adding. |
21 | |
22 | const size_t DEFAULT_SAMPLE_COUNT = 8192; |
23 | |
24 | /// What if there is not a single value - throw an exception, or return 0 or NaN in the case of double? |
25 | namespace ReservoirSamplerOnEmpty |
26 | { |
27 | enum Enum |
28 | { |
29 | THROW, |
30 | RETURN_NAN_OR_ZERO, |
31 | }; |
32 | } |
33 | |
34 | template <typename ResultType, bool is_float> |
35 | struct NanLikeValueConstructor |
36 | { |
37 | static ResultType getValue() |
38 | { |
39 | return std::numeric_limits<ResultType>::quiet_NaN(); |
40 | } |
41 | }; |
42 | template <typename ResultType> |
43 | struct NanLikeValueConstructor<ResultType, false> |
44 | { |
45 | static ResultType getValue() |
46 | { |
47 | return ResultType(); |
48 | } |
49 | }; |
50 | |
51 | template <typename T, ReservoirSamplerOnEmpty::Enum OnEmpty = ReservoirSamplerOnEmpty::THROW, typename Comparer = std::less<T>> |
52 | class ReservoirSampler |
53 | { |
54 | public: |
55 | ReservoirSampler(size_t sample_count_ = DEFAULT_SAMPLE_COUNT) |
56 | : sample_count(sample_count_) |
57 | { |
58 | rng.seed(123456); |
59 | } |
60 | |
61 | void clear() |
62 | { |
63 | samples.clear(); |
64 | sorted = false; |
65 | total_values = 0; |
66 | rng.seed(123456); |
67 | } |
68 | |
69 | void insert(const T & v) |
70 | { |
71 | if (isNaN(v)) |
72 | return; |
73 | |
74 | sorted = false; |
75 | ++total_values; |
76 | if (samples.size() < sample_count) |
77 | { |
78 | samples.push_back(v); |
79 | } |
80 | else |
81 | { |
82 | UInt64 rnd = genRandom(total_values); |
83 | if (rnd < sample_count) |
84 | samples[rnd] = v; |
85 | } |
86 | } |
87 | |
88 | size_t size() const |
89 | { |
90 | return total_values; |
91 | } |
92 | |
93 | T quantileNearest(double level) |
94 | { |
95 | if (samples.empty()) |
96 | return onEmpty<T>(); |
97 | |
98 | sortIfNeeded(); |
99 | |
100 | double index = level * (samples.size() - 1); |
101 | size_t int_index = static_cast<size_t>(index + 0.5); |
102 | int_index = std::max(0LU, std::min(samples.size() - 1, int_index)); |
103 | return samples[int_index]; |
104 | } |
105 | |
106 | /** If T is not a numeric type, using this method causes a compilation error, |
107 | * but use of error class does not. SFINAE. |
108 | */ |
109 | double quantileInterpolated(double level) |
110 | { |
111 | if (samples.empty()) |
112 | { |
113 | if (DB::IsDecimalNumber<T>) |
114 | return 0; |
115 | return onEmpty<double>(); |
116 | } |
117 | sortIfNeeded(); |
118 | |
119 | double index = std::max(0., std::min(samples.size() - 1., level * (samples.size() - 1))); |
120 | |
121 | /// To get the value of a fractional index, we linearly interpolate between neighboring values. |
122 | size_t left_index = static_cast<size_t>(index); |
123 | size_t right_index = left_index + 1; |
124 | if (right_index == samples.size()) |
125 | return samples[left_index]; |
126 | |
127 | double left_coef = right_index - index; |
128 | double right_coef = index - left_index; |
129 | |
130 | return samples[left_index] * left_coef + samples[right_index] * right_coef; |
131 | } |
132 | |
133 | void merge(const ReservoirSampler<T, OnEmpty> & b) |
134 | { |
135 | if (sample_count != b.sample_count) |
136 | throw Poco::Exception("Cannot merge ReservoirSampler's with different sample_count" ); |
137 | sorted = false; |
138 | |
139 | if (b.total_values <= sample_count) |
140 | { |
141 | for (size_t i = 0; i < b.samples.size(); ++i) |
142 | insert(b.samples[i]); |
143 | } |
144 | else if (total_values <= sample_count) |
145 | { |
146 | Array from = std::move(samples); |
147 | samples.assign(b.samples.begin(), b.samples.end()); |
148 | total_values = b.total_values; |
149 | for (size_t i = 0; i < from.size(); ++i) |
150 | insert(from[i]); |
151 | } |
152 | else |
153 | { |
154 | randomShuffle(samples); |
155 | total_values += b.total_values; |
156 | for (size_t i = 0; i < sample_count; ++i) |
157 | { |
158 | UInt64 rnd = genRandom(total_values); |
159 | if (rnd < b.total_values) |
160 | samples[i] = b.samples[i]; |
161 | } |
162 | } |
163 | } |
164 | |
165 | void read(DB::ReadBuffer & buf) |
166 | { |
167 | DB::readIntBinary<size_t>(sample_count, buf); |
168 | DB::readIntBinary<size_t>(total_values, buf); |
169 | samples.resize(std::min(total_values, sample_count)); |
170 | |
171 | std::string rng_string; |
172 | DB::readStringBinary(rng_string, buf); |
173 | std::istringstream rng_stream(rng_string); |
174 | rng_stream >> rng; |
175 | |
176 | for (size_t i = 0; i < samples.size(); ++i) |
177 | DB::readBinary(samples[i], buf); |
178 | |
179 | sorted = false; |
180 | } |
181 | |
182 | void write(DB::WriteBuffer & buf) const |
183 | { |
184 | DB::writeIntBinary<size_t>(sample_count, buf); |
185 | DB::writeIntBinary<size_t>(total_values, buf); |
186 | |
187 | std::ostringstream rng_stream; |
188 | rng_stream << rng; |
189 | DB::writeStringBinary(rng_stream.str(), buf); |
190 | |
191 | for (size_t i = 0; i < std::min(sample_count, total_values); ++i) |
192 | DB::writeBinary(samples[i], buf); |
193 | } |
194 | |
195 | private: |
196 | friend void qdigest_test(int normal_size, UInt64 value_limit, const std::vector<UInt64> & values, int queries_count, bool verbose); |
197 | friend void rs_perf_test(); |
198 | |
199 | /// We allocate a little memory on the stack - to avoid allocations when there are many objects with a small number of elements. |
200 | using Array = DB::PODArrayWithStackMemory<T, 64>; |
201 | |
202 | size_t sample_count; |
203 | size_t total_values = 0; |
204 | Array samples; |
205 | pcg32_fast rng; |
206 | bool sorted = false; |
207 | |
208 | |
209 | UInt64 genRandom(size_t lim) |
210 | { |
211 | /// With a large number of values, we will generate random numbers several times slower. |
212 | if (lim <= static_cast<UInt64>(rng.max())) |
213 | return static_cast<UInt32>(rng()) % static_cast<UInt32>(lim); |
214 | else |
215 | return (static_cast<UInt64>(rng()) * (static_cast<UInt64>(rng.max()) + 1ULL) + static_cast<UInt64>(rng())) % lim; |
216 | } |
217 | |
218 | void randomShuffle(Array & v) |
219 | { |
220 | for (size_t i = 1; i < v.size(); ++i) |
221 | { |
222 | size_t j = genRandom(i + 1); |
223 | std::swap(v[i], v[j]); |
224 | } |
225 | } |
226 | |
227 | void sortIfNeeded() |
228 | { |
229 | if (sorted) |
230 | return; |
231 | sorted = true; |
232 | std::sort(samples.begin(), samples.end(), Comparer()); |
233 | } |
234 | |
235 | template <typename ResultType> |
236 | ResultType onEmpty() const |
237 | { |
238 | if (OnEmpty == ReservoirSamplerOnEmpty::THROW) |
239 | throw Poco::Exception("Quantile of empty ReservoirSampler" ); |
240 | else |
241 | return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue(); |
242 | } |
243 | }; |
244 | |