1 | #pragma once |
2 | |
3 | #include <limits> |
4 | #include <algorithm> |
5 | #include <climits> |
6 | #include <sstream> |
7 | #include <AggregateFunctions/ReservoirSampler.h> |
8 | #include <common/Types.h> |
9 | #include <Common/HashTable/Hash.h> |
10 | #include <IO/ReadBuffer.h> |
11 | #include <IO/ReadHelpers.h> |
12 | #include <IO/WriteHelpers.h> |
13 | #include <Common/PODArray.h> |
14 | #include <Common/NaNUtils.h> |
15 | #include <Poco/Exception.h> |
16 | |
17 | |
18 | /// Implementation of Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the `sample_count` size. |
19 | /// Can approximately get quantiles. |
20 | /// The `quantile` call takes O(sample_count log sample_count), if after the previous call `quantile` there was at least one call to insert. Otherwise, O(1). |
21 | /// That is, it makes sense to first add, then get quantiles without adding. |
22 | |
23 | |
24 | namespace DB |
25 | { |
26 | namespace ErrorCodes |
27 | { |
28 | extern const int MEMORY_LIMIT_EXCEEDED; |
29 | } |
30 | } |
31 | |
32 | |
33 | namespace detail |
34 | { |
35 | const size_t DEFAULT_SAMPLE_COUNT = 8192; |
36 | const auto MAX_SKIP_DEGREE = sizeof(UInt32) * 8; |
37 | } |
38 | |
39 | /// What if there is not a single value - throw an exception, or return 0 or NaN in the case of double? |
40 | enum class ReservoirSamplerDeterministicOnEmpty |
41 | { |
42 | THROW, |
43 | RETURN_NAN_OR_ZERO, |
44 | }; |
45 | |
46 | template <typename T, |
47 | ReservoirSamplerDeterministicOnEmpty OnEmpty = ReservoirSamplerDeterministicOnEmpty::THROW> |
48 | class ReservoirSamplerDeterministic |
49 | { |
50 | bool good(const UInt32 hash) |
51 | { |
52 | return hash == ((hash >> skip_degree) << skip_degree); |
53 | } |
54 | |
55 | public: |
56 | ReservoirSamplerDeterministic(const size_t sample_count_ = DEFAULT_SAMPLE_COUNT) |
57 | : sample_count{sample_count_} |
58 | { |
59 | } |
60 | |
61 | void clear() |
62 | { |
63 | samples.clear(); |
64 | sorted = false; |
65 | total_values = 0; |
66 | } |
67 | |
68 | void insert(const T & v, const UInt64 determinator) |
69 | { |
70 | if (isNaN(v)) |
71 | return; |
72 | |
73 | const UInt32 hash = intHash64(determinator); |
74 | if (!good(hash)) |
75 | return; |
76 | |
77 | insertImpl(v, hash); |
78 | sorted = false; |
79 | ++total_values; |
80 | } |
81 | |
82 | size_t size() const |
83 | { |
84 | return total_values; |
85 | } |
86 | |
87 | T quantileNearest(double level) |
88 | { |
89 | if (samples.empty()) |
90 | return onEmpty<T>(); |
91 | |
92 | sortIfNeeded(); |
93 | |
94 | double index = level * (samples.size() - 1); |
95 | size_t int_index = static_cast<size_t>(index + 0.5); |
96 | int_index = std::max(0LU, std::min(samples.size() - 1, int_index)); |
97 | return samples[int_index].first; |
98 | } |
99 | |
100 | /** If T is not a numeric type, using this method causes a compilation error, |
101 | * but use of error class does not cause. SFINAE. |
102 | * Not SFINAE. Functions members of type templates are simply not checked until they are used. |
103 | */ |
104 | double quantileInterpolated(double level) |
105 | { |
106 | if (samples.empty()) |
107 | return onEmpty<double>(); |
108 | |
109 | sortIfNeeded(); |
110 | |
111 | const double index = std::max(0., std::min(samples.size() - 1., level * (samples.size() - 1))); |
112 | |
113 | /// To get a value from a fractional index, we linearly interpolate between adjacent values. |
114 | size_t left_index = static_cast<size_t>(index); |
115 | size_t right_index = left_index + 1; |
116 | if (right_index == samples.size()) |
117 | return samples[left_index].first; |
118 | |
119 | const double left_coef = right_index - index; |
120 | const double right_coef = index - left_index; |
121 | |
122 | return samples[left_index].first * left_coef + samples[right_index].first * right_coef; |
123 | } |
124 | |
125 | void merge(const ReservoirSamplerDeterministic & b) |
126 | { |
127 | if (sample_count != b.sample_count) |
128 | throw Poco::Exception("Cannot merge ReservoirSamplerDeterministic's with different sample_count" ); |
129 | sorted = false; |
130 | |
131 | if (b.skip_degree > skip_degree) |
132 | { |
133 | skip_degree = b.skip_degree; |
134 | thinOut(); |
135 | } |
136 | |
137 | for (const auto & sample : b.samples) |
138 | if (good(sample.second)) |
139 | insertImpl(sample.first, sample.second); |
140 | |
141 | total_values += b.total_values; |
142 | } |
143 | |
144 | void read(DB::ReadBuffer & buf) |
145 | { |
146 | DB::readIntBinary<size_t>(sample_count, buf); |
147 | DB::readIntBinary<size_t>(total_values, buf); |
148 | samples.resize(std::min(total_values, sample_count)); |
149 | |
150 | for (size_t i = 0; i < samples.size(); ++i) |
151 | DB::readPODBinary(samples[i], buf); |
152 | |
153 | sorted = false; |
154 | } |
155 | |
156 | void write(DB::WriteBuffer & buf) const |
157 | { |
158 | DB::writeIntBinary<size_t>(sample_count, buf); |
159 | DB::writeIntBinary<size_t>(total_values, buf); |
160 | |
161 | for (size_t i = 0; i < std::min(sample_count, total_values); ++i) |
162 | DB::writePODBinary(samples[i], buf); |
163 | } |
164 | |
165 | private: |
166 | /// We allocate some memory on the stack to avoid allocations when there are many objects with a small number of elements. |
167 | using Element = std::pair<T, UInt32>; |
168 | using Array = DB::PODArray<Element, 64>; |
169 | |
170 | size_t sample_count; |
171 | size_t total_values{}; |
172 | bool sorted{}; |
173 | Array samples; |
174 | UInt8 skip_degree{}; |
175 | |
176 | void insertImpl(const T & v, const UInt32 hash) |
177 | { |
178 | /// @todo why + 1? I don't quite recall |
179 | while (samples.size() + 1 >= sample_count) |
180 | { |
181 | if (++skip_degree > detail::MAX_SKIP_DEGREE) |
182 | throw DB::Exception{"skip_degree exceeds maximum value" , DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED}; |
183 | thinOut(); |
184 | } |
185 | |
186 | samples.emplace_back(v, hash); |
187 | } |
188 | |
189 | void thinOut() |
190 | { |
191 | auto size = samples.size(); |
192 | for (size_t i = 0; i < size;) |
193 | { |
194 | if (!good(samples[i].second)) |
195 | { |
196 | /// swap current element with the last one |
197 | std::swap(samples[size - 1], samples[i]); |
198 | --size; |
199 | } |
200 | else |
201 | ++i; |
202 | } |
203 | |
204 | if (size != samples.size()) |
205 | { |
206 | samples.resize(size); |
207 | sorted = false; |
208 | } |
209 | } |
210 | |
211 | void sortIfNeeded() |
212 | { |
213 | if (sorted) |
214 | return; |
215 | sorted = true; |
216 | std::sort(samples.begin(), samples.end(), [] (const std::pair<T, UInt32> & lhs, const std::pair<T, UInt32> & rhs) |
217 | { |
218 | return lhs.first < rhs.first; |
219 | }); |
220 | } |
221 | |
222 | template <typename ResultType> |
223 | ResultType onEmpty() const |
224 | { |
225 | if (OnEmpty == ReservoirSamplerDeterministicOnEmpty::THROW) |
226 | throw Poco::Exception("Quantile of empty ReservoirSamplerDeterministic" ); |
227 | else |
228 | return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue(); |
229 | } |
230 | }; |
231 | |