1/*
2 * Copyright 2012-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/stats/TDigest.h>
18#include <folly/stats/detail/DoubleRadixSort.h>
19
20#include <algorithm>
21#include <limits>
22
23namespace folly {
24
25/*
26 * A good biased scaling function has the following properties:
27 * - The value of the function k(0, delta) = 0, and k(1, delta) = delta.
28 * This is a requirement for any t-digest function.
29 * - The limit of the derivative of the function dk/dq at 0 is inf, and at
30 * 1 is inf. This provides bias to improve accuracy at the tails.
31 * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy
32 * of upper and lower quantiles are equivalent.
33 *
34 * The scaling function used here is...
35 * k(q, d) = (IF q >= 0.5, d - d * sqrt(2 - 2q) / 2, d * sqrt(2q) / 2)
36 *
37 * k(0, d) = 0
38 * k(1, d) = d
39 *
40 * dk/dq = (IF q >= 0.5, d / sqrt(2-2q), d / sqrt(2q))
41 * limit q->1 dk/dq = inf
42 * limit q->0 dk/dq = inf
43 *
44 * When plotted, the derivative function is symmetric, centered at q=0.5.
45 *
46 * Note that FMA has been tested here, but benchmarks have not shown it to be a
47 * performance improvement.
48 */
49
50/*
51 * q_to_k is unused but left here as a comment for completeness.
52 * double q_to_k(double q, double d) {
53 * if (q >= 0.5) {
54 * return d - d * std::sqrt(0.5 - 0.5 * q);
55 * }
56 * return d * std::sqrt(0.5 * q);
57 * }
58 */
59
60static double k_to_q(double k, double d) {
61 double k_div_d = k / d;
62 if (k_div_d >= 0.5) {
63 double base = 1 - k_div_d;
64 return 1 - 2 * base * base;
65 } else {
66 return 2 * k_div_d * k_div_d;
67 }
68}
69
70static double clamp(double v, double lo, double hi) {
71 if (v > hi) {
72 return hi;
73 } else if (v < lo) {
74 return lo;
75 }
76 return v;
77}
78
79TDigest::TDigest(
80 std::vector<Centroid> centroids,
81 double sum,
82 double count,
83 double max_val,
84 double min_val,
85 size_t maxSize)
86 : maxSize_(maxSize),
87 sum_(sum),
88 count_(count),
89 max_(max_val),
90 min_(min_val) {
91 if (centroids.size() <= maxSize_) {
92 centroids_ = std::move(centroids);
93 } else {
94 // Number of centroids is greater than maxSize, we need to compress them
95 // When merging, resulting digest takes the maxSize of the first digest
96 auto sz = centroids.size();
97 std::array<TDigest, 2> digests{{
98 TDigest(maxSize_),
99 TDigest(std::move(centroids), sum_, count_, max_, min_, sz),
100 }};
101 *this = this->merge(digests);
102 }
103}
104
105// Merge unsorted values by first sorting them. Use radix sort if
106// possible. This implementation puts all additional memory in the
107// heap, so that if called from fiber context we do not smash the
108// stack. Otherwise it is very similar to boost::spreadsort.
109TDigest TDigest::merge(Range<const double*> unsortedValues) const {
110 auto n = unsortedValues.size();
111
112 // We require 256 buckets per byte level, plus one count array we can reuse.
113 std::unique_ptr<uint64_t[]> buckets{new uint64_t[256 * 9]};
114 // Allocate input and tmp array
115 std::unique_ptr<double[]> tmp{new double[n * 2]};
116 auto out = tmp.get() + n;
117 auto in = tmp.get();
118 std::copy(unsortedValues.begin(), unsortedValues.end(), in);
119
120 detail::double_radix_sort(n, buckets.get(), in, out);
121 DCHECK(std::is_sorted(in, in + n));
122
123 return merge(presorted, Range<const double*>(in, in + n));
124}
125
126TDigest TDigest::merge(presorted_t, Range<const double*> sortedValues) const {
127 if (sortedValues.empty()) {
128 return *this;
129 }
130
131 TDigest result(maxSize_);
132
133 result.count_ = count_ + sortedValues.size();
134
135 double maybeMin = *sortedValues.begin();
136 double maybeMax = *(sortedValues.end() - 1);
137 if (count_ > 0) {
138 // We know that min_ and max_ are numbers
139 result.min_ = std::min(min_, maybeMin);
140 result.max_ = std::max(max_, maybeMax);
141 } else {
142 // We know that min_ and max_ are NaN.
143 result.min_ = maybeMin;
144 result.max_ = maybeMax;
145 }
146
147 std::vector<Centroid> compressed;
148 compressed.reserve(maxSize_);
149
150 double k_limit = 1;
151 double q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_;
152
153 auto it_centroids = centroids_.begin();
154 auto it_sortedValues = sortedValues.begin();
155
156 Centroid cur;
157 if (it_centroids != centroids_.end() &&
158 it_centroids->mean() < *it_sortedValues) {
159 cur = *it_centroids++;
160 } else {
161 cur = Centroid(*it_sortedValues++, 1.0);
162 }
163
164 double weightSoFar = cur.weight();
165
166 // Keep track of sums along the way to reduce expensive floating points
167 double sumsToMerge = 0;
168 double weightsToMerge = 0;
169
170 while (it_centroids != centroids_.end() ||
171 it_sortedValues != sortedValues.end()) {
172 Centroid next;
173
174 if (it_centroids != centroids_.end() &&
175 (it_sortedValues == sortedValues.end() ||
176 it_centroids->mean() < *it_sortedValues)) {
177 next = *it_centroids++;
178 } else {
179 next = Centroid(*it_sortedValues++, 1.0);
180 }
181
182 double nextSum = next.mean() * next.weight();
183 weightSoFar += next.weight();
184
185 if (weightSoFar <= q_limit_times_count) {
186 sumsToMerge += nextSum;
187 weightsToMerge += next.weight();
188 } else {
189 result.sum_ += cur.add(sumsToMerge, weightsToMerge);
190 sumsToMerge = 0;
191 weightsToMerge = 0;
192 compressed.push_back(cur);
193 q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_;
194 cur = next;
195 }
196 }
197 result.sum_ += cur.add(sumsToMerge, weightsToMerge);
198 compressed.push_back(cur);
199 compressed.shrink_to_fit();
200
201 // Deal with floating point precision
202 std::sort(compressed.begin(), compressed.end());
203
204 result.centroids_ = std::move(compressed);
205 return result;
206}
207
208TDigest TDigest::merge(Range<const TDigest*> digests) {
209 size_t nCentroids = 0;
210 for (auto it = digests.begin(); it != digests.end(); it++) {
211 nCentroids += it->centroids_.size();
212 }
213
214 if (nCentroids == 0) {
215 return TDigest();
216 }
217
218 std::vector<Centroid> centroids;
219 centroids.reserve(nCentroids);
220
221 std::vector<std::vector<Centroid>::iterator> starts;
222 starts.reserve(digests.size());
223
224 double count = 0;
225
226 // We can safely use these limits to avoid isnan checks below because we know
227 // nCentroids > 0, so at least one TDigest has a min and max.
228 double min = std::numeric_limits<double>::infinity();
229 double max = -std::numeric_limits<double>::infinity();
230
231 for (auto it = digests.begin(); it != digests.end(); it++) {
232 starts.push_back(centroids.end());
233 double curCount = it->count();
234 if (curCount > 0) {
235 DCHECK(!std::isnan(it->min_));
236 DCHECK(!std::isnan(it->max_));
237 min = std::min(min, it->min_);
238 max = std::max(max, it->max_);
239 count += curCount;
240 for (const auto& centroid : it->centroids_) {
241 centroids.push_back(centroid);
242 }
243 }
244 }
245
246 for (size_t digestsPerBlock = 1; digestsPerBlock < starts.size();
247 digestsPerBlock *= 2) {
248 // Each sorted block is digestPerBlock digests big. For each step, try to
249 // merge two blocks together.
250 for (size_t i = 0; i < starts.size(); i += (digestsPerBlock * 2)) {
251 // It is possible that this block is incomplete (less than digestsPerBlock
252 // big). In that case, the rest of the block is sorted and leave it alone
253 if (i + digestsPerBlock < starts.size()) {
254 auto first = starts[i];
255 auto middle = starts[i + digestsPerBlock];
256
257 // It is possible that the next block is incomplete (less than
258 // digestsPerBlock big). In that case, merge to end. Otherwise, merge to
259 // the end of that block.
260 std::vector<Centroid>::iterator last =
261 (i + (digestsPerBlock * 2) < starts.size())
262 ? *(starts.begin() + i + 2 * digestsPerBlock)
263 : centroids.end();
264 std::inplace_merge(first, middle, last);
265 }
266 }
267 }
268
269 DCHECK(std::is_sorted(centroids.begin(), centroids.end()));
270
271 size_t maxSize = digests.begin()->maxSize_;
272 TDigest result(maxSize);
273
274 std::vector<Centroid> compressed;
275 compressed.reserve(maxSize);
276
277 double k_limit = 1;
278 double q_limit_times_count = k_to_q(k_limit, maxSize) * count;
279
280 Centroid cur = centroids.front();
281 double weightSoFar = cur.weight();
282 double sumsToMerge = 0;
283 double weightsToMerge = 0;
284 for (auto it = centroids.begin() + 1; it != centroids.end(); ++it) {
285 weightSoFar += it->weight();
286 if (weightSoFar <= q_limit_times_count) {
287 sumsToMerge += it->mean() * it->weight();
288 weightsToMerge += it->weight();
289 } else {
290 result.sum_ += cur.add(sumsToMerge, weightsToMerge);
291 sumsToMerge = 0;
292 weightsToMerge = 0;
293 compressed.push_back(cur);
294 q_limit_times_count = k_to_q(k_limit++, maxSize) * count;
295 cur = *it;
296 }
297 }
298 result.sum_ += cur.add(sumsToMerge, weightsToMerge);
299 compressed.push_back(cur);
300 compressed.shrink_to_fit();
301
302 // Deal with floating point precision
303 std::sort(compressed.begin(), compressed.end());
304
305 result.count_ = count;
306 result.min_ = min;
307 result.max_ = max;
308 result.centroids_ = std::move(compressed);
309 return result;
310}
311
312double TDigest::estimateQuantile(double q) const {
313 if (centroids_.empty()) {
314 return 0.0;
315 }
316 double rank = q * count_;
317
318 size_t pos;
319 double t;
320 if (q > 0.5) {
321 if (q >= 1.0) {
322 return max_;
323 }
324 pos = 0;
325 t = count_;
326 for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) {
327 t -= rit->weight();
328 if (rank >= t) {
329 pos = std::distance(rit, centroids_.rend()) - 1;
330 break;
331 }
332 }
333 } else {
334 if (q <= 0.0) {
335 return min_;
336 }
337 pos = centroids_.size() - 1;
338 t = 0;
339 for (auto it = centroids_.begin(); it != centroids_.end(); ++it) {
340 if (rank < t + it->weight()) {
341 pos = std::distance(centroids_.begin(), it);
342 break;
343 }
344 t += it->weight();
345 }
346 }
347
348 double delta = 0;
349 double min = min_;
350 double max = max_;
351 if (centroids_.size() > 1) {
352 if (pos == 0) {
353 delta = centroids_[pos + 1].mean() - centroids_[pos].mean();
354 max = centroids_[pos + 1].mean();
355 } else if (pos == centroids_.size() - 1) {
356 delta = centroids_[pos].mean() - centroids_[pos - 1].mean();
357 min = centroids_[pos - 1].mean();
358 } else {
359 delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2;
360 min = centroids_[pos - 1].mean();
361 max = centroids_[pos + 1].mean();
362 }
363 }
364 auto value = centroids_[pos].mean() +
365 ((rank - t) / centroids_[pos].weight() - 0.5) * delta;
366 return clamp(value, min, max);
367}
368
369double TDigest::Centroid::add(double sum, double weight) {
370 sum += (mean_ * weight_);
371 weight_ += weight;
372 mean_ = sum / weight_;
373 return sum;
374}
375
376} // namespace folly
377