1/*
2 * Licensed to Derrick R. Burns under one or more
3 * contributor license agreements. See the NOTICES file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#pragma once
19
20#include <algorithm>
21#include <cfloat>
22#include <cmath>
23#include <queue>
24#include <utility>
25#include <vector>
26
27#ifdef min
28#undef min
29#endif
30
31#ifdef max
32#undef max
33#endif
34
35
36namespace duckdb_tdigest {
37
38using Value = double;
39using Weight = double;
40using Index = size_t;
41
42const size_t kHighWater = 40000;
43const double pi = 3.14159265358979323846;
44class Centroid {
45public:
46 Centroid() : Centroid(0.0, 0.0) {
47 }
48
49 Centroid(Value mean, Weight weight) : mean_(mean), weight_(weight) {
50 }
51
52 inline Value mean() const noexcept {
53 return mean_;
54 }
55
56 inline Weight weight() const noexcept {
57 return weight_;
58 }
59
60 inline void add(const Centroid &c) {
61 // CHECK_GT(c.weight_, 0);
62 if (weight_ != 0.0) {
63 weight_ += c.weight_;
64 mean_ += c.weight_ * (c.mean_ - mean_) / weight_;
65 } else {
66 weight_ = c.weight_;
67 mean_ = c.mean_;
68 }
69 }
70
71private:
72 Value mean_ = 0;
73 Weight weight_ = 0;
74};
75
76struct CentroidList {
77 explicit CentroidList(const std::vector<Centroid> &s) : iter(s.cbegin()), end(s.cend()) {
78 }
79 std::vector<Centroid>::const_iterator iter;
80 std::vector<Centroid>::const_iterator end;
81
82 bool advance() {
83 return ++iter != end;
84 }
85};
86
87class CentroidListComparator {
88public:
89 CentroidListComparator() {
90 }
91
92 bool operator()(const CentroidList &left, const CentroidList &right) const {
93 return left.iter->mean() > right.iter->mean();
94 }
95};
96
97using CentroidListQueue = std::priority_queue<CentroidList, std::vector<CentroidList>, CentroidListComparator>;
98
99struct CentroidComparator {
100 bool operator()(const Centroid &a, const Centroid &b) const {
101 return a.mean() < b.mean();
102 }
103};
104
105class TDigest {
106 class TDigestComparator {
107 public:
108 TDigestComparator() {
109 }
110
111 bool operator()(const TDigest *left, const TDigest *right) const {
112 return left->totalSize() > right->totalSize();
113 }
114 };
115
116 using TDigestQueue = std::priority_queue<const TDigest *, std::vector<const TDigest *>, TDigestComparator>;
117
118public:
119 TDigest() : TDigest(1000) {
120 }
121
122 explicit TDigest(Value compression) : TDigest(compression, 0) {
123 }
124
125 TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) {
126 }
127
128 TDigest(Value compression, Index unmergedSize, Index mergedSize)
129 : compression_(compression), maxProcessed_(processedSize(size: mergedSize, compression)),
130 maxUnprocessed_(unprocessedSize(size: unmergedSize, compression)) {
131 processed_.reserve(n: maxProcessed_);
132 unprocessed_.reserve(n: maxUnprocessed_ + 1);
133 }
134
135 TDigest(std::vector<Centroid> &&processed, std::vector<Centroid> &&unprocessed, Value compression,
136 Index unmergedSize, Index mergedSize)
137 : TDigest(compression, unmergedSize, mergedSize) {
138 processed_ = std::move(processed);
139 unprocessed_ = std::move(unprocessed);
140
141 processedWeight_ = weight(centroids&: processed_);
142 unprocessedWeight_ = weight(centroids&: unprocessed_);
143 if (!processed_.empty()) {
144 min_ = std::min(min_, processed_[0].mean());
145 max_ = std::max(max_, (processed_.cend() - 1)->mean());
146 }
147 updateCumulative();
148 }
149
150 static Weight weight(std::vector<Centroid> &centroids) noexcept {
151 Weight w = 0.0;
152 for (auto centroid : centroids) {
153 w += centroid.weight();
154 }
155 return w;
156 }
157
158 TDigest &operator=(TDigest &&o) {
159 compression_ = o.compression_;
160 maxProcessed_ = o.maxProcessed_;
161 maxUnprocessed_ = o.maxUnprocessed_;
162 processedWeight_ = o.processedWeight_;
163 unprocessedWeight_ = o.unprocessedWeight_;
164 processed_ = std::move(o.processed_);
165 unprocessed_ = std::move(o.unprocessed_);
166 cumulative_ = std::move(o.cumulative_);
167 min_ = o.min_;
168 max_ = o.max_;
169 return *this;
170 }
171
172 TDigest(TDigest &&o)
173 : TDigest(std::move(o.processed_), std::move(o.unprocessed_), o.compression_, o.maxUnprocessed_,
174 o.maxProcessed_) {
175 }
176
177 static inline Index processedSize(Index size, Value compression) noexcept {
178 return (size == 0) ? static_cast<Index>(2 * std::ceil(x: compression)) : size;
179 }
180
181 static inline Index unprocessedSize(Index size, Value compression) noexcept {
182 return (size == 0) ? static_cast<Index>(8 * std::ceil(x: compression)) : size;
183 }
184
185 // merge in another t-digest
186 inline void merge(const TDigest *other) {
187 std::vector<const TDigest *> others {other};
188 add(iter: others.cbegin(), end: others.cend());
189 }
190
191 const std::vector<Centroid> &processed() const {
192 return processed_;
193 }
194
195 const std::vector<Centroid> &unprocessed() const {
196 return unprocessed_;
197 }
198
199 Index maxUnprocessed() const {
200 return maxUnprocessed_;
201 }
202
203 Index maxProcessed() const {
204 return maxProcessed_;
205 }
206
207 inline void add(std::vector<const TDigest *> digests) {
208 add(iter: digests.cbegin(), end: digests.cend());
209 }
210
211 // merge in a vector of tdigests in the most efficient manner possible
212 // in constant space
213 // works for any value of kHighWater
214 void add(std::vector<const TDigest *>::const_iterator iter, std::vector<const TDigest *>::const_iterator end) {
215 if (iter != end) {
216 auto size = std::distance(first: iter, last: end);
217 TDigestQueue pq(TDigestComparator {});
218 for (; iter != end; iter++) {
219 pq.push(x: (*iter));
220 }
221 std::vector<const TDigest *> batch;
222 batch.reserve(n: size);
223
224 size_t totalSize = 0;
225 while (!pq.empty()) {
226 auto td = pq.top();
227 batch.push_back(x: td);
228 pq.pop();
229 totalSize += td->totalSize();
230 if (totalSize >= kHighWater || pq.empty()) {
231 mergeProcessed(tdigests: batch);
232 mergeUnprocessed(tdigests: batch);
233 processIfNecessary();
234 batch.clear();
235 totalSize = 0;
236 }
237 }
238 updateCumulative();
239 }
240 }
241
242 Weight processedWeight() const {
243 return processedWeight_;
244 }
245
246 Weight unprocessedWeight() const {
247 return unprocessedWeight_;
248 }
249
250 bool haveUnprocessed() const {
251 return unprocessed_.size() > 0;
252 }
253
254 size_t totalSize() const {
255 return processed_.size() + unprocessed_.size();
256 }
257
258 long totalWeight() const {
259 return static_cast<long>(processedWeight_ + unprocessedWeight_);
260 }
261
262 // return the cdf on the t-digest
263 Value cdf(Value x) {
264 if (haveUnprocessed() || isDirty()) {
265 process();
266 }
267 return cdfProcessed(x);
268 }
269
270 bool isDirty() {
271 return processed_.size() > maxProcessed_ || unprocessed_.size() > maxUnprocessed_;
272 }
273
274 // return the cdf on the processed values
275 Value cdfProcessed(Value x) const {
276 if (processed_.empty()) {
277 // no data to examin_e
278
279 return 0.0;
280 } else if (processed_.size() == 1) {
281 // exactly one centroid, should have max_==min_
282 auto width = max_ - min_;
283 if (x < min_) {
284 return 0.0;
285 } else if (x > max_) {
286 return 1.0;
287 } else if (x - min_ <= width) {
288 // min_ and max_ are too close together to do any viable interpolation
289 return 0.5;
290 } else {
291 // interpolate if somehow we have weight > 0 and max_ != min_
292 return (x - min_) / (max_ - min_);
293 }
294 } else {
295 auto n = processed_.size();
296 if (x <= min_) {
297 return 0;
298 }
299
300 if (x >= max_) {
301 return 1;
302 }
303
304 // check for the left tail
305 if (x <= mean(i: 0)) {
306
307 // note that this is different than mean(0) > min_ ... this guarantees interpolation works
308 if (mean(i: 0) - min_ > 0) {
309 return (x - min_) / (mean(i: 0) - min_) * weight(i: 0) / processedWeight_ / 2.0;
310 } else {
311 return 0;
312 }
313 }
314
315 // and the right tail
316 if (x >= mean(i: n - 1)) {
317 if (max_ - mean(i: n - 1) > 0) {
318 return 1.0 - (max_ - x) / (max_ - mean(i: n - 1)) * weight(i: n - 1) / processedWeight_ / 2.0;
319 } else {
320 return 1;
321 }
322 }
323
324 CentroidComparator cc;
325 auto iter = std::upper_bound(first: processed_.cbegin(), last: processed_.cend(), val: Centroid(x, 0), comp: cc);
326
327 auto i = std::distance(first: processed_.cbegin(), last: iter);
328 auto z1 = x - (iter - 1)->mean();
329 auto z2 = (iter)->mean() - x;
330 return weightedAverage(x1: cumulative_[i - 1], w1: z2, x2: cumulative_[i], w2: z1) / processedWeight_;
331 }
332 }
333
334 // this returns a quantile on the t-digest
335 Value quantile(Value q) {
336 if (haveUnprocessed() || isDirty()) {
337 process();
338 }
339 return quantileProcessed(q);
340 }
341
342 // this returns a quantile on the currently processed values without changing the t-digest
343 // the value will not represent the unprocessed values
344 Value quantileProcessed(Value q) const {
345 if (q < 0 || q > 1) {
346 return NAN;
347 }
348
349 if (processed_.size() == 0) {
350 // no sorted means no data, no way to get a quantile
351 return NAN;
352 } else if (processed_.size() == 1) {
353 // with one data point, all quantiles lead to Rome
354
355 return mean(i: 0);
356 }
357
358 // we know that there are at least two sorted now
359 auto n = processed_.size();
360
361 // if values were stored in a sorted array, index would be the offset we are Weighterested in
362 const auto index = q * processedWeight_;
363
364 // at the boundaries, we return min_ or max_
365 if (index <= weight(i: 0) / 2.0) {
366 return min_ + 2.0 * index / weight(i: 0) * (mean(i: 0) - min_);
367 }
368
369 auto iter = std::lower_bound(cumulative_.cbegin(), cumulative_.cend(), index);
370
371 if (iter + 1 != cumulative_.cend()) {
372 auto i = std::distance(first: cumulative_.cbegin(), last: iter);
373 auto z1 = index - *(iter - 1);
374 auto z2 = *(iter)-index;
375 // LOG(INFO) << "z2 " << z2 << " index " << index << " z1 " << z1;
376 return weightedAverage(x1: mean(i: i - 1), w1: z2, x2: mean(i), w2: z1);
377 }
378 auto z1 = index - processedWeight_ - weight(i: n - 1) / 2.0;
379 auto z2 = weight(i: n - 1) / 2 - z1;
380 return weightedAverage(x1: mean(i: n - 1), w1: z1, x2: max_, w2: z2);
381 }
382
383 Value compression() const {
384 return compression_;
385 }
386
387 void add(Value x) {
388 add(x, w: 1);
389 }
390
391 inline void compress() {
392 process();
393 }
394
395 // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has
396 // been reached.
397 inline bool add(Value x, Weight w) {
398 if (std::isnan(x: x)) {
399 return false;
400 }
401 unprocessed_.push_back(x: Centroid(x, w));
402 unprocessedWeight_ += w;
403 processIfNecessary();
404 return true;
405 }
406
407 inline void add(std::vector<Centroid>::const_iterator iter, std::vector<Centroid>::const_iterator end) {
408 while (iter != end) {
409 const size_t diff = std::distance(first: iter, last: end);
410 const size_t room = maxUnprocessed_ - unprocessed_.size();
411 auto mid = iter + std::min(diff, room);
412 while (iter != mid) {
413 unprocessed_.push_back(x: *(iter++));
414 }
415 if (unprocessed_.size() >= maxUnprocessed_) {
416 process();
417 }
418 }
419 }
420
421private:
422 Value compression_;
423
424 Value min_ = std::numeric_limits<Value>::max();
425
426 Value max_ = std::numeric_limits<Value>::min();
427
428 Index maxProcessed_;
429
430 Index maxUnprocessed_;
431
432 Value processedWeight_ = 0.0;
433
434 Value unprocessedWeight_ = 0.0;
435
436 std::vector<Centroid> processed_;
437
438 std::vector<Centroid> unprocessed_;
439
440 std::vector<Weight> cumulative_;
441
442 // return mean of i-th centroid
443 inline Value mean(int i) const noexcept {
444 return processed_[i].mean();
445 }
446
447 // return weight of i-th centroid
448 inline Weight weight(int i) const noexcept {
449 return processed_[i].weight();
450 }
451
452 // append all unprocessed centroids into current unprocessed vector
453 void mergeUnprocessed(const std::vector<const TDigest *> &tdigests) {
454 if (tdigests.size() == 0) {
455 return;
456 }
457
458 size_t total = unprocessed_.size();
459 for (auto &td : tdigests) {
460 total += td->unprocessed_.size();
461 }
462
463 unprocessed_.reserve(n: total);
464 for (auto &td : tdigests) {
465 unprocessed_.insert(position: unprocessed_.end(), first: td->unprocessed_.cbegin(), last: td->unprocessed_.cend());
466 unprocessedWeight_ += td->unprocessedWeight_;
467 }
468 }
469
470 // merge all processed centroids together into a single sorted vector
471 void mergeProcessed(const std::vector<const TDigest *> &tdigests) {
472 if (tdigests.size() == 0) {
473 return;
474 }
475
476 size_t total = 0;
477 CentroidListQueue pq(CentroidListComparator {});
478 for (auto &td : tdigests) {
479 auto &sorted = td->processed_;
480 auto size = sorted.size();
481 if (size > 0) {
482 pq.push(x: CentroidList(sorted));
483 total += size;
484 processedWeight_ += td->processedWeight_;
485 }
486 }
487 if (total == 0) {
488 return;
489 }
490
491 if (processed_.size() > 0) {
492 pq.push(x: CentroidList(processed_));
493 total += processed_.size();
494 }
495
496 std::vector<Centroid> sorted;
497 sorted.reserve(n: total);
498
499 while (!pq.empty()) {
500 auto best = pq.top();
501 pq.pop();
502 sorted.push_back(x: *(best.iter));
503 if (best.advance()) {
504 pq.push(x: best);
505 }
506 }
507 processed_ = std::move(sorted);
508 if (processed_.size() > 0) {
509 min_ = std::min(min_, processed_[0].mean());
510 max_ = std::max(max_, (processed_.cend() - 1)->mean());
511 }
512 }
513
514 inline void processIfNecessary() {
515 if (isDirty()) {
516 process();
517 }
518 }
519
520 void updateCumulative() {
521 const auto n = processed_.size();
522 cumulative_.clear();
523 cumulative_.reserve(n: n + 1);
524 auto previous = 0.0;
525 for (Index i = 0; i < n; i++) {
526 auto current = weight(i);
527 auto halfCurrent = current / 2.0;
528 cumulative_.push_back(x: previous + halfCurrent);
529 previous = previous + current;
530 }
531 cumulative_.push_back(x: previous);
532 }
533
534 // merges unprocessed_ centroids and processed_ centroids together and processes them
535 // when complete, unprocessed_ will be empty and processed_ will have at most maxProcessed_ centroids
536 inline void process() {
537 CentroidComparator cc;
538 std::sort(first: unprocessed_.begin(), last: unprocessed_.end(), comp: cc);
539 auto count = unprocessed_.size();
540 unprocessed_.insert(position: unprocessed_.end(), first: processed_.cbegin(), last: processed_.cend());
541 std::inplace_merge(first: unprocessed_.begin(), middle: unprocessed_.begin() + count, last: unprocessed_.end(), comp: cc);
542
543 processedWeight_ += unprocessedWeight_;
544 unprocessedWeight_ = 0;
545 processed_.clear();
546
547 processed_.push_back(x: unprocessed_[0]);
548 Weight wSoFar = unprocessed_[0].weight();
549 Weight wLimit = processedWeight_ * integratedQ(k: 1.0);
550
551 auto end = unprocessed_.end();
552 for (auto iter = unprocessed_.cbegin() + 1; iter < end; iter++) {
553 auto &centroid = *iter;
554 Weight projectedW = wSoFar + centroid.weight();
555 if (projectedW <= wLimit) {
556 wSoFar = projectedW;
557 (processed_.end() - 1)->add(c: centroid);
558 } else {
559 auto k1 = integratedLocation(q: wSoFar / processedWeight_);
560 wLimit = processedWeight_ * integratedQ(k: k1 + 1.0);
561 wSoFar += centroid.weight();
562 processed_.emplace_back(args: centroid);
563 }
564 }
565 unprocessed_.clear();
566 min_ = std::min(min_, processed_[0].mean());
567 max_ = std::max(max_, (processed_.cend() - 1)->mean());
568 updateCumulative();
569 }
570
571 inline int checkWeights() {
572 return checkWeights(sorted: processed_, total: processedWeight_);
573 }
574
575 size_t checkWeights(const std::vector<Centroid> &sorted, Value total) {
576 size_t badWeight = 0;
577 auto k1 = 0.0;
578 auto q = 0.0;
579 for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) {
580 auto w = iter->weight();
581 auto dq = w / total;
582 auto k2 = integratedLocation(q: q + dq);
583 if (k2 - k1 > 1 && w != 1) {
584 badWeight++;
585 }
586 if (k2 - k1 > 1.5 && w != 1) {
587 badWeight++;
588 }
589 q += dq;
590 k1 = k2;
591 }
592
593 return badWeight;
594 }
595
596 /**
597 * Converts a quantile into a centroid scale value. The centroid scale is nomin_ally
598 * the number k of the centroid that a quantile point q should belong to. Due to
599 * round-offs, however, we can't align things perfectly without splitting points
600 * and sorted. We don't want to do that, so we have to allow for offsets.
601 * In the end, the criterion is that any quantile range that spans a centroid
602 * scale range more than one should be split across more than one centroid if
603 * possible. This won't be possible if the quantile range refers to a single point
604 * or an already existing centroid.
605 * <p/>
606 * This mapping is steep near q=0 or q=1 so each centroid there will correspond to
607 * less q range. Near q=0.5, the mapping is flatter so that sorted there will
608 * represent a larger chunk of quantiles.
609 *
610 * @param q The quantile scale value to be mapped.
611 * @return The centroid scale value corresponding to q.
612 */
613 inline Value integratedLocation(Value q) const {
614 return compression_ * (std::asin(x: 2.0 * q - 1.0) + pi / 2) / pi;
615 }
616
617 inline Value integratedQ(Value k) const {
618 return (std::sin(x: std::min(k, compression_) * pi / compression_ - pi / 2) + 1) / 2;
619 }
620
621 /**
622 * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips
623 * the order of the variables if <code>x2</code> is greater than
624 * <code>x1</code>.
625 */
626 static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) {
627 return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2) : weightedAverageSorted(x1: x2, w1: w2, x2: x1, w2: w1);
628 }
629
630 /**
631 * Compute the weighted average between <code>x1</code> with a weight of
632 * <code>w1</code> and <code>x2</code> with a weight of <code>w2</code>.
633 * This expects <code>x1</code> to be less than or equal to <code>x2</code>
634 * and is guaranteed to return a number between <code>x1</code> and
635 * <code>x2</code>.
636 */
637 static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) {
638 const Value x = (x1 * w1 + x2 * w2) / (w1 + w2);
639 return std::max(x1, std::min(x, x2));
640 }
641
642 static Value interpolate(Value x, Value x0, Value x1) {
643 return (x - x0) / (x1 - x0);
644 }
645
646 /**
647 * Computes an interpolated value of a quantile that is between two sorted.
648 *
649 * Index is the quantile desired multiplied by the total number of samples - 1.
650 *
651 * @param index Denormalized quantile desired
652 * @param previousIndex The denormalized quantile corresponding to the center of the previous centroid.
653 * @param nextIndex The denormalized quantile corresponding to the center of the following centroid.
654 * @param previousMean The mean of the previous centroid.
655 * @param nextMean The mean of the following centroid.
656 * @return The interpolated mean.
657 */
658 static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean, Value nextMean) {
659 const auto delta = nextIndex - previousIndex;
660 const auto previousWeight = (nextIndex - index) / delta;
661 const auto nextWeight = (index - previousIndex) / delta;
662 return previousMean * previousWeight + nextMean * nextWeight;
663 }
664};
665
666} // namespace tdigest
667