1#pragma once
2
3#include <Common/HashTable/Hash.h>
4#include <Common/PODArray.h>
5#include <IO/ReadBuffer.h>
6#include <IO/WriteBuffer.h>
7#include <IO/ReadHelpers.h>
8#include <IO/WriteHelpers.h>
9#include <common/likely.h>
10
11
12namespace DB
13{
14
15namespace ErrorCodes
16{
17 extern const int LOGICAL_ERROR;
18}
19
20/** Calculates quantile for time in milliseconds, less than 30 seconds.
21 * If the value is greater than 30 seconds, the value is set to 30 seconds.
22 *
23 * If total values is not greater than about 5670, then the calculation is accurate.
24 *
25 * Otherwise
26 * If time less that 1024 ms, than calculation is accurate.
27 * Otherwise, the computation is rounded to a multiple of 16 ms.
28 *
29 * Three different data structures are used:
30 * - flat array (of all met values) of fixed length, allocated inplace, size 64 bytes; Stores 0..31 values;
31 * - flat array (of all values encountered), allocated separately, increasing length;
32 * - a histogram (that is, value -> number), consisting of two parts
33 * -- for values from 0 to 1023 - in increments of 1;
34 * -- for values from 1024 to 30,000 - in increments of 16;
35 */
36
37#define TINY_MAX_ELEMS 31
38#define BIG_THRESHOLD 30000
39
40namespace detail
41{
42 /** Helper structure for optimization in the case of a small number of values
43 * - flat array of a fixed size "on the stack" in which all encountered values placed in succession.
44 * Size - 64 bytes. Must be a POD-type (used in union).
45 */
46 struct QuantileTimingTiny
47 {
48 mutable UInt16 elems[TINY_MAX_ELEMS]; /// mutable because array sorting is not considered a state change.
49 /// It's important that `count` be at the end of the structure, since the beginning of the structure will be subsequently rewritten by other objects.
50 /// You must initialize it by zero itself.
51 /// Why? `count` field is reused even in cases where the union contains other structures
52 /// (the size of which falls short of this field.)
53 UInt16 count;
54
55 /// Can only be used while `count < TINY_MAX_ELEMS`.
56 void insert(UInt64 x)
57 {
58 if (unlikely(x > BIG_THRESHOLD))
59 x = BIG_THRESHOLD;
60
61 elems[count] = x;
62 ++count;
63 }
64
65 /// Can only be used while `count + rhs.count <= TINY_MAX_ELEMS`.
66 void merge(const QuantileTimingTiny & rhs)
67 {
68 for (size_t i = 0; i < rhs.count; ++i)
69 {
70 elems[count] = rhs.elems[i];
71 ++count;
72 }
73 }
74
75 void serialize(WriteBuffer & buf) const
76 {
77 writeBinary(count, buf);
78 buf.write(reinterpret_cast<const char *>(elems), count * sizeof(elems[0]));
79 }
80
81 void deserialize(ReadBuffer & buf)
82 {
83 readBinary(count, buf);
84 buf.readStrict(reinterpret_cast<char *>(elems), count * sizeof(elems[0]));
85 }
86
87 /** This function must be called before get-functions. */
88 void prepare() const
89 {
90 std::sort(elems, elems + count);
91 }
92
93 UInt16 get(double level) const
94 {
95 return level != 1
96 ? elems[static_cast<size_t>(count * level)]
97 : elems[count - 1];
98 }
99
100 template <typename ResultType>
101 void getMany(const double * levels, size_t size, ResultType * result) const
102 {
103 const double * levels_end = levels + size;
104
105 while (levels != levels_end)
106 {
107 *result = get(*levels);
108 ++levels;
109 ++result;
110 }
111 }
112
113 /// The same, but in the case of an empty state NaN is returned.
114 float getFloat(double level) const
115 {
116 return count
117 ? get(level)
118 : std::numeric_limits<float>::quiet_NaN();
119 }
120
121 void getManyFloat(const double * levels, size_t size, float * result) const
122 {
123 if (count)
124 getMany(levels, size, result);
125 else
126 for (size_t i = 0; i < size; ++i)
127 result[i] = std::numeric_limits<float>::quiet_NaN();
128 }
129 };
130
131
132 /** Auxiliary structure for optimization in case of average number of values
133 * - a flat array, allocated separately, into which all found values are put in succession.
134 */
135 struct QuantileTimingMedium
136 {
137 /// sizeof - 24 bytes.
138 using Array = PODArray<UInt16, 128>;
139 mutable Array elems; /// mutable because array sorting is not considered a state change.
140
141 QuantileTimingMedium() {}
142 QuantileTimingMedium(const UInt16 * begin, const UInt16 * end) : elems(begin, end) {}
143
144 void insert(UInt64 x)
145 {
146 if (unlikely(x > BIG_THRESHOLD))
147 x = BIG_THRESHOLD;
148
149 elems.emplace_back(x);
150 }
151
152 void merge(const QuantileTimingMedium & rhs)
153 {
154 elems.insert(rhs.elems.begin(), rhs.elems.end());
155 }
156
157 void serialize(WriteBuffer & buf) const
158 {
159 writeBinary(elems.size(), buf);
160 buf.write(reinterpret_cast<const char *>(elems.data()), elems.size() * sizeof(elems[0]));
161 }
162
163 void deserialize(ReadBuffer & buf)
164 {
165 size_t size = 0;
166 readBinary(size, buf);
167 elems.resize(size);
168 buf.readStrict(reinterpret_cast<char *>(elems.data()), size * sizeof(elems[0]));
169 }
170
171 UInt16 get(double level) const
172 {
173 UInt16 quantile = 0;
174
175 if (!elems.empty())
176 {
177 size_t n = level < 1
178 ? level * elems.size()
179 : (elems.size() - 1);
180
181 /// Sorting an array will not be considered a violation of constancy.
182 auto & array = const_cast<Array &>(elems);
183 std::nth_element(array.begin(), array.begin() + n, array.end());
184 quantile = array[n];
185 }
186
187 return quantile;
188 }
189
190 template <typename ResultType>
191 void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
192 {
193 size_t prev_n = 0;
194 auto & array = const_cast<Array &>(elems);
195 for (size_t i = 0; i < size; ++i)
196 {
197 auto level_index = levels_permutation[i];
198 auto level = levels[level_index];
199
200 size_t n = level < 1
201 ? level * elems.size()
202 : (elems.size() - 1);
203
204 std::nth_element(array.begin() + prev_n, array.begin() + n, array.end());
205
206 result[level_index] = array[n];
207 prev_n = n;
208 }
209 }
210
211 /// Same, but in the case of an empty state, NaN is returned.
212 float getFloat(double level) const
213 {
214 return !elems.empty()
215 ? get(level)
216 : std::numeric_limits<float>::quiet_NaN();
217 }
218
219 void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
220 {
221 if (!elems.empty())
222 getMany(levels, levels_permutation, size, result);
223 else
224 for (size_t i = 0; i < size; ++i)
225 result[i] = std::numeric_limits<float>::quiet_NaN();
226 }
227 };
228
229
230 #define SMALL_THRESHOLD 1024
231 #define BIG_SIZE ((BIG_THRESHOLD - SMALL_THRESHOLD) / BIG_PRECISION)
232 #define BIG_PRECISION 16
233
234 #define SIZE_OF_LARGE_WITHOUT_COUNT ((SMALL_THRESHOLD + BIG_SIZE) * sizeof(UInt64))
235
236
237 /** For a large number of values. The size is about 22 680 bytes.
238 */
239 class QuantileTimingLarge
240 {
241 private:
242 /// Total number of values.
243 UInt64 count;
244 /// Use of UInt64 is very wasteful.
245 /// But UInt32 is definitely not enough, and it's too hard to invent 6-byte values.
246
247 /// Number of values for each value is smaller than `small_threshold`.
248 UInt64 count_small[SMALL_THRESHOLD];
249
250 /// The number of values for each value from `small_threshold` to `big_threshold`, rounded to `big_precision`.
251 UInt64 count_big[BIG_SIZE];
252
253 /// Get value of quantile by index in array `count_big`.
254 static inline UInt16 indexInBigToValue(size_t i)
255 {
256 return (i * BIG_PRECISION) + SMALL_THRESHOLD
257 + (intHash32<0>(i) % BIG_PRECISION - (BIG_PRECISION / 2)); /// A small randomization so that it is not noticeable that all the values are even.
258 }
259
260 /// Lets you scroll through the histogram values, skipping zeros.
261 class Iterator
262 {
263 private:
264 const UInt64 * begin;
265 const UInt64 * pos;
266 const UInt64 * end;
267
268 void adjust()
269 {
270 while (isValid() && 0 == *pos)
271 ++pos;
272 }
273
274 public:
275 Iterator(const QuantileTimingLarge & parent)
276 : begin(parent.count_small), pos(begin), end(&parent.count_big[BIG_SIZE])
277 {
278 adjust();
279 }
280
281 bool isValid() const { return pos < end; }
282
283 void next()
284 {
285 ++pos;
286 adjust();
287 }
288
289 UInt64 count() const { return *pos; }
290
291 UInt16 key() const
292 {
293 return pos - begin < SMALL_THRESHOLD
294 ? pos - begin
295 : indexInBigToValue(pos - begin - SMALL_THRESHOLD);
296 }
297 };
298
299 public:
300 QuantileTimingLarge()
301 {
302 memset(this, 0, sizeof(*this));
303 }
304
305 void insert(UInt64 x) noexcept
306 {
307 insertWeighted(x, 1);
308 }
309
310 void insertWeighted(UInt64 x, size_t weight) noexcept
311 {
312 count += weight;
313
314 if (x < SMALL_THRESHOLD)
315 count_small[x] += weight;
316 else if (x < BIG_THRESHOLD)
317 count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION] += weight;
318 }
319
320 void merge(const QuantileTimingLarge & rhs) noexcept
321 {
322 count += rhs.count;
323
324 for (size_t i = 0; i < SMALL_THRESHOLD; ++i)
325 count_small[i] += rhs.count_small[i];
326
327 for (size_t i = 0; i < BIG_SIZE; ++i)
328 count_big[i] += rhs.count_big[i];
329 }
330
331 void serialize(WriteBuffer & buf) const
332 {
333 writeBinary(count, buf);
334
335 if (count * 2 > SMALL_THRESHOLD + BIG_SIZE)
336 {
337 /// Simple serialization for a heavily dense case.
338 buf.write(reinterpret_cast<const char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT);
339 }
340 else
341 {
342 /// More compact serialization for a sparse case.
343
344 for (size_t i = 0; i < SMALL_THRESHOLD; ++i)
345 {
346 if (count_small[i])
347 {
348 writeBinary(UInt16(i), buf);
349 writeBinary(count_small[i], buf);
350 }
351 }
352
353 for (size_t i = 0; i < BIG_SIZE; ++i)
354 {
355 if (count_big[i])
356 {
357 writeBinary(UInt16(i + SMALL_THRESHOLD), buf);
358 writeBinary(count_big[i], buf);
359 }
360 }
361
362 /// Symbolizes end of data.
363 writeBinary(UInt16(BIG_THRESHOLD), buf);
364 }
365 }
366
367 void deserialize(ReadBuffer & buf)
368 {
369 readBinary(count, buf);
370
371 if (count * 2 > SMALL_THRESHOLD + BIG_SIZE)
372 {
373 buf.readStrict(reinterpret_cast<char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT);
374 }
375 else
376 {
377 while (true)
378 {
379 UInt16 index = 0;
380 readBinary(index, buf);
381 if (index == BIG_THRESHOLD)
382 break;
383
384 UInt64 elem_count = 0;
385 readBinary(elem_count, buf);
386
387 if (index < SMALL_THRESHOLD)
388 count_small[index] = elem_count;
389 else
390 count_big[index - SMALL_THRESHOLD] = elem_count;
391 }
392 }
393 }
394
395
396 /// Get the value of the `level` quantile. The level must be between 0 and 1.
397 UInt16 get(double level) const
398 {
399 UInt64 pos = std::ceil(count * level);
400
401 UInt64 accumulated = 0;
402 Iterator it(*this);
403
404 while (it.isValid())
405 {
406 accumulated += it.count();
407
408 if (accumulated >= pos)
409 break;
410
411 it.next();
412 }
413
414 return it.isValid() ? it.key() : BIG_THRESHOLD;
415 }
416
417 /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
418 /// indices - an array of index levels such that the corresponding elements will go in ascending order.
419 template <typename ResultType>
420 void getMany(const double * levels, const size_t * indices, size_t size, ResultType * result) const
421 {
422 const auto indices_end = indices + size;
423 auto index = indices;
424
425 UInt64 pos = std::ceil(count * levels[*index]);
426
427 UInt64 accumulated = 0;
428 Iterator it(*this);
429
430 while (it.isValid())
431 {
432 accumulated += it.count();
433
434 while (accumulated >= pos)
435 {
436 result[*index] = it.key();
437 ++index;
438
439 if (index == indices_end)
440 return;
441
442 pos = std::ceil(count * levels[*index]);
443 }
444
445 it.next();
446 }
447
448 while (index != indices_end)
449 {
450 result[*index] = std::numeric_limits<ResultType>::max() < BIG_THRESHOLD
451 ? std::numeric_limits<ResultType>::max() : BIG_THRESHOLD;
452 ++index;
453 }
454 }
455
456 /// The same, but in the case of an empty state, NaN is returned.
457 float getFloat(double level) const
458 {
459 return count
460 ? get(level)
461 : std::numeric_limits<float>::quiet_NaN();
462 }
463
464 void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
465 {
466 if (count)
467 getMany(levels, levels_permutation, size, result);
468 else
469 for (size_t i = 0; i < size; ++i)
470 result[i] = std::numeric_limits<float>::quiet_NaN();
471 }
472 };
473}
474
475
476/** sizeof - 64 bytes.
477 * If there are not enough of them - allocates up to 20 KB of memory in addition.
478 */
479template <typename> /// Unused template parameter is for AggregateFunctionQuantile.
480class QuantileTiming : private boost::noncopyable
481{
482private:
483 union
484 {
485 detail::QuantileTimingTiny tiny;
486 detail::QuantileTimingMedium medium;
487 detail::QuantileTimingLarge * large;
488 };
489
490 enum class Kind : UInt8
491 {
492 Tiny = 1,
493 Medium = 2,
494 Large = 3
495 };
496
497 Kind which() const
498 {
499 if (tiny.count <= TINY_MAX_ELEMS)
500 return Kind::Tiny;
501 if (tiny.count == TINY_MAX_ELEMS + 1)
502 return Kind::Medium;
503 return Kind::Large;
504 }
505
506 void tinyToMedium()
507 {
508 detail::QuantileTimingTiny tiny_copy = tiny;
509 new (&medium) detail::QuantileTimingMedium(tiny_copy.elems, tiny_copy.elems + tiny_copy.count);
510 tiny.count = TINY_MAX_ELEMS + 1;
511 }
512
513 void mediumToLarge()
514 {
515 /// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data).
516 detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
517
518 for (const auto & elem : medium.elems)
519 tmp_large->insert(elem); /// Cannot throw, so don't worry about new.
520
521 medium.~QuantileTimingMedium();
522 large = tmp_large;
523 tiny.count = TINY_MAX_ELEMS + 2; /// large will be deleted in destructor.
524 }
525
526 void tinyToLarge()
527 {
528 /// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data).
529 detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
530
531 for (size_t i = 0; i < tiny.count; ++i)
532 tmp_large->insert(tiny.elems[i]); /// Cannot throw, so don't worry about new.
533
534 large = tmp_large;
535 tiny.count = TINY_MAX_ELEMS + 2; /// large will be deleted in destructor.
536 }
537
538 bool mediumIsWorthToConvertToLarge() const
539 {
540 return medium.elems.size() >= sizeof(detail::QuantileTimingLarge) / sizeof(medium.elems[0]) / 2;
541 }
542
543public:
544 QuantileTiming()
545 {
546 tiny.count = 0;
547 }
548
549 ~QuantileTiming()
550 {
551 Kind kind = which();
552
553 if (kind == Kind::Medium)
554 {
555 medium.~QuantileTimingMedium();
556 }
557 else if (kind == Kind::Large)
558 {
559 delete large;
560 }
561 }
562
563 void add(UInt64 x)
564 {
565 if (tiny.count < TINY_MAX_ELEMS)
566 {
567 tiny.insert(x);
568 }
569 else
570 {
571 if (unlikely(tiny.count == TINY_MAX_ELEMS))
572 tinyToMedium();
573
574 if (which() == Kind::Medium)
575 {
576 if (unlikely(mediumIsWorthToConvertToLarge()))
577 {
578 mediumToLarge();
579 large->insert(x);
580 }
581 else
582 medium.insert(x);
583 }
584 else
585 large->insert(x);
586 }
587 }
588
589 void add(UInt64 x, size_t weight)
590 {
591 /// NOTE: First condition is to avoid overflow.
592 if (weight < TINY_MAX_ELEMS && tiny.count + weight <= TINY_MAX_ELEMS)
593 {
594 for (size_t i = 0; i < weight; ++i)
595 tiny.insert(x);
596 }
597 else
598 {
599 if (unlikely(tiny.count <= TINY_MAX_ELEMS))
600 tinyToLarge(); /// For the weighted variant we do not use `medium` - presumably, it is impractical.
601
602 large->insertWeighted(x, weight);
603 }
604 }
605
606 /// NOTE Too complicated.
607 void merge(const QuantileTiming & rhs)
608 {
609 if (tiny.count + rhs.tiny.count <= TINY_MAX_ELEMS)
610 {
611 tiny.merge(rhs.tiny);
612 }
613 else
614 {
615 auto kind = which();
616 auto rhs_kind = rhs.which();
617
618 /// If one with which we merge has a larger data structure, then we bring the current structure to the same one.
619 if (kind == Kind::Tiny && rhs_kind == Kind::Medium)
620 {
621 tinyToMedium();
622 kind = Kind::Medium;
623 }
624 else if (kind == Kind::Tiny && rhs_kind == Kind::Large)
625 {
626 tinyToLarge();
627 kind = Kind::Large;
628 }
629 else if (kind == Kind::Medium && rhs_kind == Kind::Large)
630 {
631 mediumToLarge();
632 kind = Kind::Large;
633 }
634 /// Case when two states are small, but when merged, they will turn into average.
635 else if (kind == Kind::Tiny && rhs_kind == Kind::Tiny)
636 {
637 tinyToMedium();
638 kind = Kind::Medium;
639 }
640
641 if (kind == Kind::Medium && rhs_kind == Kind::Medium)
642 {
643 medium.merge(rhs.medium);
644 }
645 else if (kind == Kind::Large && rhs_kind == Kind::Large)
646 {
647 large->merge(*rhs.large);
648 }
649 else if (kind == Kind::Medium && rhs_kind == Kind::Tiny)
650 {
651 medium.elems.insert(rhs.tiny.elems, rhs.tiny.elems + rhs.tiny.count);
652 }
653 else if (kind == Kind::Large && rhs_kind == Kind::Tiny)
654 {
655 for (size_t i = 0; i < rhs.tiny.count; ++i)
656 large->insert(rhs.tiny.elems[i]);
657 }
658 else if (kind == Kind::Large && rhs_kind == Kind::Medium)
659 {
660 for (const auto & elem : rhs.medium.elems)
661 large->insert(elem);
662 }
663 else
664 throw Exception("Logical error in QuantileTiming::merge function: not all cases are covered", ErrorCodes::LOGICAL_ERROR);
665
666 /// For determinism, we should always convert to `large` when size condition is reached
667 /// - regardless of merge order.
668 if (kind == Kind::Medium && unlikely(mediumIsWorthToConvertToLarge()))
669 {
670 mediumToLarge();
671 }
672 }
673 }
674
675 void serialize(WriteBuffer & buf) const
676 {
677 auto kind = which();
678 DB::writePODBinary(kind, buf);
679
680 if (kind == Kind::Tiny)
681 tiny.serialize(buf);
682 else if (kind == Kind::Medium)
683 medium.serialize(buf);
684 else
685 large->serialize(buf);
686 }
687
688 /// Called for an empty object.
689 void deserialize(ReadBuffer & buf)
690 {
691 Kind kind;
692 DB::readPODBinary(kind, buf);
693
694 if (kind == Kind::Tiny)
695 {
696 tiny.deserialize(buf);
697 }
698 else if (kind == Kind::Medium)
699 {
700 tinyToMedium();
701 medium.deserialize(buf);
702 }
703 else if (kind == Kind::Large)
704 {
705 tinyToLarge();
706 large->deserialize(buf);
707 }
708 }
709
710 /// Get the value of the `level` quantile. The level must be between 0 and 1.
711 UInt16 get(double level) const
712 {
713 Kind kind = which();
714
715 if (kind == Kind::Tiny)
716 {
717 tiny.prepare();
718 return tiny.get(level);
719 }
720 else if (kind == Kind::Medium)
721 {
722 return medium.get(level);
723 }
724 else
725 {
726 return large->get(level);
727 }
728 }
729
730 /// Get the size values of the quantiles of the `levels` levels. Record `size` results starting with `result` address.
731 template <typename ResultType>
732 void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
733 {
734 Kind kind = which();
735
736 if (kind == Kind::Tiny)
737 {
738 tiny.prepare();
739 tiny.getMany(levels, size, result);
740 }
741 else if (kind == Kind::Medium)
742 {
743 medium.getMany(levels, levels_permutation, size, result);
744 }
745 else /*if (kind == Kind::Large)*/
746 {
747 large->getMany(levels, levels_permutation, size, result);
748 }
749 }
750
751 /// The same, but in the case of an empty state, NaN is returned.
752 float getFloat(double level) const
753 {
754 return tiny.count
755 ? get(level)
756 : std::numeric_limits<float>::quiet_NaN();
757 }
758
759 void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
760 {
761 if (tiny.count)
762 getMany(levels, levels_permutation, size, result);
763 else
764 for (size_t i = 0; i < size; ++i)
765 result[i] = std::numeric_limits<float>::quiet_NaN();
766 }
767};
768
769#undef SMALL_THRESHOLD
770#undef BIG_THRESHOLD
771#undef BIG_SIZE
772#undef BIG_PRECISION
773#undef TINY_MAX_ELEMS
774
775}
776