1 | /* |
2 | * Copyright 2012-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/stats/TDigest.h> |
18 | #include <folly/stats/detail/DoubleRadixSort.h> |
19 | |
20 | #include <algorithm> |
21 | #include <limits> |
22 | |
23 | namespace folly { |
24 | |
25 | /* |
26 | * A good biased scaling function has the following properties: |
27 | * - The value of the function k(0, delta) = 0, and k(1, delta) = delta. |
28 | * This is a requirement for any t-digest function. |
29 | * - The limit of the derivative of the function dk/dq at 0 is inf, and at |
30 | * 1 is inf. This provides bias to improve accuracy at the tails. |
31 | * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy |
32 | * of upper and lower quantiles are equivalent. |
33 | * |
34 | * The scaling function used here is... |
35 | * k(q, d) = (IF q >= 0.5, d - d * sqrt(2 - 2q) / 2, d * sqrt(2q) / 2) |
36 | * |
37 | * k(0, d) = 0 |
38 | * k(1, d) = d |
39 | * |
40 | * dk/dq = (IF q >= 0.5, d / sqrt(2-2q), d / sqrt(2q)) |
41 | * limit q->1 dk/dq = inf |
42 | * limit q->0 dk/dq = inf |
43 | * |
44 | * When plotted, the derivative function is symmetric, centered at q=0.5. |
45 | * |
46 | * Note that FMA has been tested here, but benchmarks have not shown it to be a |
47 | * performance improvement. |
48 | */ |
49 | |
50 | /* |
51 | * q_to_k is unused but left here as a comment for completeness. |
52 | * double q_to_k(double q, double d) { |
53 | * if (q >= 0.5) { |
54 | * return d - d * std::sqrt(0.5 - 0.5 * q); |
55 | * } |
56 | * return d * std::sqrt(0.5 * q); |
57 | * } |
58 | */ |
59 | |
60 | static double k_to_q(double k, double d) { |
61 | double k_div_d = k / d; |
62 | if (k_div_d >= 0.5) { |
63 | double base = 1 - k_div_d; |
64 | return 1 - 2 * base * base; |
65 | } else { |
66 | return 2 * k_div_d * k_div_d; |
67 | } |
68 | } |
69 | |
70 | static double clamp(double v, double lo, double hi) { |
71 | if (v > hi) { |
72 | return hi; |
73 | } else if (v < lo) { |
74 | return lo; |
75 | } |
76 | return v; |
77 | } |
78 | |
79 | TDigest::TDigest( |
80 | std::vector<Centroid> centroids, |
81 | double sum, |
82 | double count, |
83 | double max_val, |
84 | double min_val, |
85 | size_t maxSize) |
86 | : maxSize_(maxSize), |
87 | sum_(sum), |
88 | count_(count), |
89 | max_(max_val), |
90 | min_(min_val) { |
91 | if (centroids.size() <= maxSize_) { |
92 | centroids_ = std::move(centroids); |
93 | } else { |
94 | // Number of centroids is greater than maxSize, we need to compress them |
95 | // When merging, resulting digest takes the maxSize of the first digest |
96 | auto sz = centroids.size(); |
97 | std::array<TDigest, 2> digests{{ |
98 | TDigest(maxSize_), |
99 | TDigest(std::move(centroids), sum_, count_, max_, min_, sz), |
100 | }}; |
101 | *this = this->merge(digests); |
102 | } |
103 | } |
104 | |
105 | // Merge unsorted values by first sorting them. Use radix sort if |
106 | // possible. This implementation puts all additional memory in the |
107 | // heap, so that if called from fiber context we do not smash the |
108 | // stack. Otherwise it is very similar to boost::spreadsort. |
109 | TDigest TDigest::merge(Range<const double*> unsortedValues) const { |
110 | auto n = unsortedValues.size(); |
111 | |
112 | // We require 256 buckets per byte level, plus one count array we can reuse. |
113 | std::unique_ptr<uint64_t[]> buckets{new uint64_t[256 * 9]}; |
114 | // Allocate input and tmp array |
115 | std::unique_ptr<double[]> tmp{new double[n * 2]}; |
116 | auto out = tmp.get() + n; |
117 | auto in = tmp.get(); |
118 | std::copy(unsortedValues.begin(), unsortedValues.end(), in); |
119 | |
120 | detail::double_radix_sort(n, buckets.get(), in, out); |
121 | DCHECK(std::is_sorted(in, in + n)); |
122 | |
123 | return merge(presorted, Range<const double*>(in, in + n)); |
124 | } |
125 | |
126 | TDigest TDigest::merge(presorted_t, Range<const double*> sortedValues) const { |
127 | if (sortedValues.empty()) { |
128 | return *this; |
129 | } |
130 | |
131 | TDigest result(maxSize_); |
132 | |
133 | result.count_ = count_ + sortedValues.size(); |
134 | |
135 | double maybeMin = *sortedValues.begin(); |
136 | double maybeMax = *(sortedValues.end() - 1); |
137 | if (count_ > 0) { |
138 | // We know that min_ and max_ are numbers |
139 | result.min_ = std::min(min_, maybeMin); |
140 | result.max_ = std::max(max_, maybeMax); |
141 | } else { |
142 | // We know that min_ and max_ are NaN. |
143 | result.min_ = maybeMin; |
144 | result.max_ = maybeMax; |
145 | } |
146 | |
147 | std::vector<Centroid> compressed; |
148 | compressed.reserve(maxSize_); |
149 | |
150 | double k_limit = 1; |
151 | double q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; |
152 | |
153 | auto it_centroids = centroids_.begin(); |
154 | auto it_sortedValues = sortedValues.begin(); |
155 | |
156 | Centroid cur; |
157 | if (it_centroids != centroids_.end() && |
158 | it_centroids->mean() < *it_sortedValues) { |
159 | cur = *it_centroids++; |
160 | } else { |
161 | cur = Centroid(*it_sortedValues++, 1.0); |
162 | } |
163 | |
164 | double weightSoFar = cur.weight(); |
165 | |
166 | // Keep track of sums along the way to reduce expensive floating points |
167 | double sumsToMerge = 0; |
168 | double weightsToMerge = 0; |
169 | |
170 | while (it_centroids != centroids_.end() || |
171 | it_sortedValues != sortedValues.end()) { |
172 | Centroid next; |
173 | |
174 | if (it_centroids != centroids_.end() && |
175 | (it_sortedValues == sortedValues.end() || |
176 | it_centroids->mean() < *it_sortedValues)) { |
177 | next = *it_centroids++; |
178 | } else { |
179 | next = Centroid(*it_sortedValues++, 1.0); |
180 | } |
181 | |
182 | double nextSum = next.mean() * next.weight(); |
183 | weightSoFar += next.weight(); |
184 | |
185 | if (weightSoFar <= q_limit_times_count) { |
186 | sumsToMerge += nextSum; |
187 | weightsToMerge += next.weight(); |
188 | } else { |
189 | result.sum_ += cur.add(sumsToMerge, weightsToMerge); |
190 | sumsToMerge = 0; |
191 | weightsToMerge = 0; |
192 | compressed.push_back(cur); |
193 | q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; |
194 | cur = next; |
195 | } |
196 | } |
197 | result.sum_ += cur.add(sumsToMerge, weightsToMerge); |
198 | compressed.push_back(cur); |
199 | compressed.shrink_to_fit(); |
200 | |
201 | // Deal with floating point precision |
202 | std::sort(compressed.begin(), compressed.end()); |
203 | |
204 | result.centroids_ = std::move(compressed); |
205 | return result; |
206 | } |
207 | |
208 | TDigest TDigest::merge(Range<const TDigest*> digests) { |
209 | size_t nCentroids = 0; |
210 | for (auto it = digests.begin(); it != digests.end(); it++) { |
211 | nCentroids += it->centroids_.size(); |
212 | } |
213 | |
214 | if (nCentroids == 0) { |
215 | return TDigest(); |
216 | } |
217 | |
218 | std::vector<Centroid> centroids; |
219 | centroids.reserve(nCentroids); |
220 | |
221 | std::vector<std::vector<Centroid>::iterator> starts; |
222 | starts.reserve(digests.size()); |
223 | |
224 | double count = 0; |
225 | |
226 | // We can safely use these limits to avoid isnan checks below because we know |
227 | // nCentroids > 0, so at least one TDigest has a min and max. |
228 | double min = std::numeric_limits<double>::infinity(); |
229 | double max = -std::numeric_limits<double>::infinity(); |
230 | |
231 | for (auto it = digests.begin(); it != digests.end(); it++) { |
232 | starts.push_back(centroids.end()); |
233 | double curCount = it->count(); |
234 | if (curCount > 0) { |
235 | DCHECK(!std::isnan(it->min_)); |
236 | DCHECK(!std::isnan(it->max_)); |
237 | min = std::min(min, it->min_); |
238 | max = std::max(max, it->max_); |
239 | count += curCount; |
240 | for (const auto& centroid : it->centroids_) { |
241 | centroids.push_back(centroid); |
242 | } |
243 | } |
244 | } |
245 | |
246 | for (size_t digestsPerBlock = 1; digestsPerBlock < starts.size(); |
247 | digestsPerBlock *= 2) { |
248 | // Each sorted block is digestPerBlock digests big. For each step, try to |
249 | // merge two blocks together. |
250 | for (size_t i = 0; i < starts.size(); i += (digestsPerBlock * 2)) { |
251 | // It is possible that this block is incomplete (less than digestsPerBlock |
252 | // big). In that case, the rest of the block is sorted and leave it alone |
253 | if (i + digestsPerBlock < starts.size()) { |
254 | auto first = starts[i]; |
255 | auto middle = starts[i + digestsPerBlock]; |
256 | |
257 | // It is possible that the next block is incomplete (less than |
258 | // digestsPerBlock big). In that case, merge to end. Otherwise, merge to |
259 | // the end of that block. |
260 | std::vector<Centroid>::iterator last = |
261 | (i + (digestsPerBlock * 2) < starts.size()) |
262 | ? *(starts.begin() + i + 2 * digestsPerBlock) |
263 | : centroids.end(); |
264 | std::inplace_merge(first, middle, last); |
265 | } |
266 | } |
267 | } |
268 | |
269 | DCHECK(std::is_sorted(centroids.begin(), centroids.end())); |
270 | |
271 | size_t maxSize = digests.begin()->maxSize_; |
272 | TDigest result(maxSize); |
273 | |
274 | std::vector<Centroid> compressed; |
275 | compressed.reserve(maxSize); |
276 | |
277 | double k_limit = 1; |
278 | double q_limit_times_count = k_to_q(k_limit, maxSize) * count; |
279 | |
280 | Centroid cur = centroids.front(); |
281 | double weightSoFar = cur.weight(); |
282 | double sumsToMerge = 0; |
283 | double weightsToMerge = 0; |
284 | for (auto it = centroids.begin() + 1; it != centroids.end(); ++it) { |
285 | weightSoFar += it->weight(); |
286 | if (weightSoFar <= q_limit_times_count) { |
287 | sumsToMerge += it->mean() * it->weight(); |
288 | weightsToMerge += it->weight(); |
289 | } else { |
290 | result.sum_ += cur.add(sumsToMerge, weightsToMerge); |
291 | sumsToMerge = 0; |
292 | weightsToMerge = 0; |
293 | compressed.push_back(cur); |
294 | q_limit_times_count = k_to_q(k_limit++, maxSize) * count; |
295 | cur = *it; |
296 | } |
297 | } |
298 | result.sum_ += cur.add(sumsToMerge, weightsToMerge); |
299 | compressed.push_back(cur); |
300 | compressed.shrink_to_fit(); |
301 | |
302 | // Deal with floating point precision |
303 | std::sort(compressed.begin(), compressed.end()); |
304 | |
305 | result.count_ = count; |
306 | result.min_ = min; |
307 | result.max_ = max; |
308 | result.centroids_ = std::move(compressed); |
309 | return result; |
310 | } |
311 | |
312 | double TDigest::estimateQuantile(double q) const { |
313 | if (centroids_.empty()) { |
314 | return 0.0; |
315 | } |
316 | double rank = q * count_; |
317 | |
318 | size_t pos; |
319 | double t; |
320 | if (q > 0.5) { |
321 | if (q >= 1.0) { |
322 | return max_; |
323 | } |
324 | pos = 0; |
325 | t = count_; |
326 | for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) { |
327 | t -= rit->weight(); |
328 | if (rank >= t) { |
329 | pos = std::distance(rit, centroids_.rend()) - 1; |
330 | break; |
331 | } |
332 | } |
333 | } else { |
334 | if (q <= 0.0) { |
335 | return min_; |
336 | } |
337 | pos = centroids_.size() - 1; |
338 | t = 0; |
339 | for (auto it = centroids_.begin(); it != centroids_.end(); ++it) { |
340 | if (rank < t + it->weight()) { |
341 | pos = std::distance(centroids_.begin(), it); |
342 | break; |
343 | } |
344 | t += it->weight(); |
345 | } |
346 | } |
347 | |
348 | double delta = 0; |
349 | double min = min_; |
350 | double max = max_; |
351 | if (centroids_.size() > 1) { |
352 | if (pos == 0) { |
353 | delta = centroids_[pos + 1].mean() - centroids_[pos].mean(); |
354 | max = centroids_[pos + 1].mean(); |
355 | } else if (pos == centroids_.size() - 1) { |
356 | delta = centroids_[pos].mean() - centroids_[pos - 1].mean(); |
357 | min = centroids_[pos - 1].mean(); |
358 | } else { |
359 | delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2; |
360 | min = centroids_[pos - 1].mean(); |
361 | max = centroids_[pos + 1].mean(); |
362 | } |
363 | } |
364 | auto value = centroids_[pos].mean() + |
365 | ((rank - t) / centroids_[pos].weight() - 0.5) * delta; |
366 | return clamp(value, min, max); |
367 | } |
368 | |
369 | double TDigest::Centroid::add(double sum, double weight) { |
370 | sum += (mean_ * weight_); |
371 | weight_ += weight; |
372 | mean_ = sum / weight_; |
373 | return sum; |
374 | } |
375 | |
376 | } // namespace folly |
377 | |