| 1 | #pragma once |
| 2 | |
| 3 | #include <Common/HashTable/Hash.h> |
| 4 | #include <Common/PODArray.h> |
| 5 | #include <IO/ReadBuffer.h> |
| 6 | #include <IO/WriteBuffer.h> |
| 7 | #include <IO/ReadHelpers.h> |
| 8 | #include <IO/WriteHelpers.h> |
| 9 | #include <common/likely.h> |
| 10 | |
| 11 | |
| 12 | namespace DB |
| 13 | { |
| 14 | |
| 15 | namespace ErrorCodes |
| 16 | { |
| 17 | extern const int LOGICAL_ERROR; |
| 18 | } |
| 19 | |
| 20 | /** Calculates quantile for time in milliseconds, less than 30 seconds. |
| 21 | * If the value is greater than 30 seconds, the value is set to 30 seconds. |
| 22 | * |
| 23 | * If total values is not greater than about 5670, then the calculation is accurate. |
| 24 | * |
| 25 | * Otherwise |
| 26 | * If time less that 1024 ms, than calculation is accurate. |
| 27 | * Otherwise, the computation is rounded to a multiple of 16 ms. |
| 28 | * |
| 29 | * Three different data structures are used: |
| 30 | * - flat array (of all met values) of fixed length, allocated inplace, size 64 bytes; Stores 0..31 values; |
| 31 | * - flat array (of all values encountered), allocated separately, increasing length; |
| 32 | * - a histogram (that is, value -> number), consisting of two parts |
| 33 | * -- for values from 0 to 1023 - in increments of 1; |
| 34 | * -- for values from 1024 to 30,000 - in increments of 16; |
| 35 | */ |
| 36 | |
| 37 | #define TINY_MAX_ELEMS 31 |
| 38 | #define BIG_THRESHOLD 30000 |
| 39 | |
| 40 | namespace detail |
| 41 | { |
| 42 | /** Helper structure for optimization in the case of a small number of values |
| 43 | * - flat array of a fixed size "on the stack" in which all encountered values placed in succession. |
| 44 | * Size - 64 bytes. Must be a POD-type (used in union). |
| 45 | */ |
| 46 | struct QuantileTimingTiny |
| 47 | { |
| 48 | mutable UInt16 elems[TINY_MAX_ELEMS]; /// mutable because array sorting is not considered a state change. |
| 49 | /// It's important that `count` be at the end of the structure, since the beginning of the structure will be subsequently rewritten by other objects. |
| 50 | /// You must initialize it by zero itself. |
| 51 | /// Why? `count` field is reused even in cases where the union contains other structures |
| 52 | /// (the size of which falls short of this field.) |
| 53 | UInt16 count; |
| 54 | |
| 55 | /// Can only be used while `count < TINY_MAX_ELEMS`. |
| 56 | void insert(UInt64 x) |
| 57 | { |
| 58 | if (unlikely(x > BIG_THRESHOLD)) |
| 59 | x = BIG_THRESHOLD; |
| 60 | |
| 61 | elems[count] = x; |
| 62 | ++count; |
| 63 | } |
| 64 | |
| 65 | /// Can only be used while `count + rhs.count <= TINY_MAX_ELEMS`. |
| 66 | void merge(const QuantileTimingTiny & rhs) |
| 67 | { |
| 68 | for (size_t i = 0; i < rhs.count; ++i) |
| 69 | { |
| 70 | elems[count] = rhs.elems[i]; |
| 71 | ++count; |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | void serialize(WriteBuffer & buf) const |
| 76 | { |
| 77 | writeBinary(count, buf); |
| 78 | buf.write(reinterpret_cast<const char *>(elems), count * sizeof(elems[0])); |
| 79 | } |
| 80 | |
| 81 | void deserialize(ReadBuffer & buf) |
| 82 | { |
| 83 | readBinary(count, buf); |
| 84 | buf.readStrict(reinterpret_cast<char *>(elems), count * sizeof(elems[0])); |
| 85 | } |
| 86 | |
| 87 | /** This function must be called before get-functions. */ |
| 88 | void prepare() const |
| 89 | { |
| 90 | std::sort(elems, elems + count); |
| 91 | } |
| 92 | |
| 93 | UInt16 get(double level) const |
| 94 | { |
| 95 | return level != 1 |
| 96 | ? elems[static_cast<size_t>(count * level)] |
| 97 | : elems[count - 1]; |
| 98 | } |
| 99 | |
| 100 | template <typename ResultType> |
| 101 | void getMany(const double * levels, size_t size, ResultType * result) const |
| 102 | { |
| 103 | const double * levels_end = levels + size; |
| 104 | |
| 105 | while (levels != levels_end) |
| 106 | { |
| 107 | *result = get(*levels); |
| 108 | ++levels; |
| 109 | ++result; |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | /// The same, but in the case of an empty state NaN is returned. |
| 114 | float getFloat(double level) const |
| 115 | { |
| 116 | return count |
| 117 | ? get(level) |
| 118 | : std::numeric_limits<float>::quiet_NaN(); |
| 119 | } |
| 120 | |
| 121 | void getManyFloat(const double * levels, size_t size, float * result) const |
| 122 | { |
| 123 | if (count) |
| 124 | getMany(levels, size, result); |
| 125 | else |
| 126 | for (size_t i = 0; i < size; ++i) |
| 127 | result[i] = std::numeric_limits<float>::quiet_NaN(); |
| 128 | } |
| 129 | }; |
| 130 | |
| 131 | |
| 132 | /** Auxiliary structure for optimization in case of average number of values |
| 133 | * - a flat array, allocated separately, into which all found values are put in succession. |
| 134 | */ |
| 135 | struct QuantileTimingMedium |
| 136 | { |
| 137 | /// sizeof - 24 bytes. |
| 138 | using Array = PODArray<UInt16, 128>; |
| 139 | mutable Array elems; /// mutable because array sorting is not considered a state change. |
| 140 | |
| 141 | QuantileTimingMedium() {} |
| 142 | QuantileTimingMedium(const UInt16 * begin, const UInt16 * end) : elems(begin, end) {} |
| 143 | |
| 144 | void insert(UInt64 x) |
| 145 | { |
| 146 | if (unlikely(x > BIG_THRESHOLD)) |
| 147 | x = BIG_THRESHOLD; |
| 148 | |
| 149 | elems.emplace_back(x); |
| 150 | } |
| 151 | |
| 152 | void merge(const QuantileTimingMedium & rhs) |
| 153 | { |
| 154 | elems.insert(rhs.elems.begin(), rhs.elems.end()); |
| 155 | } |
| 156 | |
| 157 | void serialize(WriteBuffer & buf) const |
| 158 | { |
| 159 | writeBinary(elems.size(), buf); |
| 160 | buf.write(reinterpret_cast<const char *>(elems.data()), elems.size() * sizeof(elems[0])); |
| 161 | } |
| 162 | |
| 163 | void deserialize(ReadBuffer & buf) |
| 164 | { |
| 165 | size_t size = 0; |
| 166 | readBinary(size, buf); |
| 167 | elems.resize(size); |
| 168 | buf.readStrict(reinterpret_cast<char *>(elems.data()), size * sizeof(elems[0])); |
| 169 | } |
| 170 | |
| 171 | UInt16 get(double level) const |
| 172 | { |
| 173 | UInt16 quantile = 0; |
| 174 | |
| 175 | if (!elems.empty()) |
| 176 | { |
| 177 | size_t n = level < 1 |
| 178 | ? level * elems.size() |
| 179 | : (elems.size() - 1); |
| 180 | |
| 181 | /// Sorting an array will not be considered a violation of constancy. |
| 182 | auto & array = const_cast<Array &>(elems); |
| 183 | std::nth_element(array.begin(), array.begin() + n, array.end()); |
| 184 | quantile = array[n]; |
| 185 | } |
| 186 | |
| 187 | return quantile; |
| 188 | } |
| 189 | |
| 190 | template <typename ResultType> |
| 191 | void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const |
| 192 | { |
| 193 | size_t prev_n = 0; |
| 194 | auto & array = const_cast<Array &>(elems); |
| 195 | for (size_t i = 0; i < size; ++i) |
| 196 | { |
| 197 | auto level_index = levels_permutation[i]; |
| 198 | auto level = levels[level_index]; |
| 199 | |
| 200 | size_t n = level < 1 |
| 201 | ? level * elems.size() |
| 202 | : (elems.size() - 1); |
| 203 | |
| 204 | std::nth_element(array.begin() + prev_n, array.begin() + n, array.end()); |
| 205 | |
| 206 | result[level_index] = array[n]; |
| 207 | prev_n = n; |
| 208 | } |
| 209 | } |
| 210 | |
| 211 | /// Same, but in the case of an empty state, NaN is returned. |
| 212 | float getFloat(double level) const |
| 213 | { |
| 214 | return !elems.empty() |
| 215 | ? get(level) |
| 216 | : std::numeric_limits<float>::quiet_NaN(); |
| 217 | } |
| 218 | |
| 219 | void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const |
| 220 | { |
| 221 | if (!elems.empty()) |
| 222 | getMany(levels, levels_permutation, size, result); |
| 223 | else |
| 224 | for (size_t i = 0; i < size; ++i) |
| 225 | result[i] = std::numeric_limits<float>::quiet_NaN(); |
| 226 | } |
| 227 | }; |
| 228 | |
| 229 | |
| 230 | #define SMALL_THRESHOLD 1024 |
| 231 | #define BIG_SIZE ((BIG_THRESHOLD - SMALL_THRESHOLD) / BIG_PRECISION) |
| 232 | #define BIG_PRECISION 16 |
| 233 | |
| 234 | #define SIZE_OF_LARGE_WITHOUT_COUNT ((SMALL_THRESHOLD + BIG_SIZE) * sizeof(UInt64)) |
| 235 | |
| 236 | |
| 237 | /** For a large number of values. The size is about 22 680 bytes. |
| 238 | */ |
| 239 | class QuantileTimingLarge |
| 240 | { |
| 241 | private: |
| 242 | /// Total number of values. |
| 243 | UInt64 count; |
| 244 | /// Use of UInt64 is very wasteful. |
| 245 | /// But UInt32 is definitely not enough, and it's too hard to invent 6-byte values. |
| 246 | |
| 247 | /// Number of values for each value is smaller than `small_threshold`. |
| 248 | UInt64 count_small[SMALL_THRESHOLD]; |
| 249 | |
| 250 | /// The number of values for each value from `small_threshold` to `big_threshold`, rounded to `big_precision`. |
| 251 | UInt64 count_big[BIG_SIZE]; |
| 252 | |
| 253 | /// Get value of quantile by index in array `count_big`. |
| 254 | static inline UInt16 indexInBigToValue(size_t i) |
| 255 | { |
| 256 | return (i * BIG_PRECISION) + SMALL_THRESHOLD |
| 257 | + (intHash32<0>(i) % BIG_PRECISION - (BIG_PRECISION / 2)); /// A small randomization so that it is not noticeable that all the values are even. |
| 258 | } |
| 259 | |
| 260 | /// Lets you scroll through the histogram values, skipping zeros. |
| 261 | class Iterator |
| 262 | { |
| 263 | private: |
| 264 | const UInt64 * begin; |
| 265 | const UInt64 * pos; |
| 266 | const UInt64 * end; |
| 267 | |
| 268 | void adjust() |
| 269 | { |
| 270 | while (isValid() && 0 == *pos) |
| 271 | ++pos; |
| 272 | } |
| 273 | |
| 274 | public: |
| 275 | Iterator(const QuantileTimingLarge & parent) |
| 276 | : begin(parent.count_small), pos(begin), end(&parent.count_big[BIG_SIZE]) |
| 277 | { |
| 278 | adjust(); |
| 279 | } |
| 280 | |
| 281 | bool isValid() const { return pos < end; } |
| 282 | |
| 283 | void next() |
| 284 | { |
| 285 | ++pos; |
| 286 | adjust(); |
| 287 | } |
| 288 | |
| 289 | UInt64 count() const { return *pos; } |
| 290 | |
| 291 | UInt16 key() const |
| 292 | { |
| 293 | return pos - begin < SMALL_THRESHOLD |
| 294 | ? pos - begin |
| 295 | : indexInBigToValue(pos - begin - SMALL_THRESHOLD); |
| 296 | } |
| 297 | }; |
| 298 | |
| 299 | public: |
| 300 | QuantileTimingLarge() |
| 301 | { |
| 302 | memset(this, 0, sizeof(*this)); |
| 303 | } |
| 304 | |
| 305 | void insert(UInt64 x) noexcept |
| 306 | { |
| 307 | insertWeighted(x, 1); |
| 308 | } |
| 309 | |
| 310 | void insertWeighted(UInt64 x, size_t weight) noexcept |
| 311 | { |
| 312 | count += weight; |
| 313 | |
| 314 | if (x < SMALL_THRESHOLD) |
| 315 | count_small[x] += weight; |
| 316 | else if (x < BIG_THRESHOLD) |
| 317 | count_big[(x - SMALL_THRESHOLD) / BIG_PRECISION] += weight; |
| 318 | } |
| 319 | |
| 320 | void merge(const QuantileTimingLarge & rhs) noexcept |
| 321 | { |
| 322 | count += rhs.count; |
| 323 | |
| 324 | for (size_t i = 0; i < SMALL_THRESHOLD; ++i) |
| 325 | count_small[i] += rhs.count_small[i]; |
| 326 | |
| 327 | for (size_t i = 0; i < BIG_SIZE; ++i) |
| 328 | count_big[i] += rhs.count_big[i]; |
| 329 | } |
| 330 | |
| 331 | void serialize(WriteBuffer & buf) const |
| 332 | { |
| 333 | writeBinary(count, buf); |
| 334 | |
| 335 | if (count * 2 > SMALL_THRESHOLD + BIG_SIZE) |
| 336 | { |
| 337 | /// Simple serialization for a heavily dense case. |
| 338 | buf.write(reinterpret_cast<const char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT); |
| 339 | } |
| 340 | else |
| 341 | { |
| 342 | /// More compact serialization for a sparse case. |
| 343 | |
| 344 | for (size_t i = 0; i < SMALL_THRESHOLD; ++i) |
| 345 | { |
| 346 | if (count_small[i]) |
| 347 | { |
| 348 | writeBinary(UInt16(i), buf); |
| 349 | writeBinary(count_small[i], buf); |
| 350 | } |
| 351 | } |
| 352 | |
| 353 | for (size_t i = 0; i < BIG_SIZE; ++i) |
| 354 | { |
| 355 | if (count_big[i]) |
| 356 | { |
| 357 | writeBinary(UInt16(i + SMALL_THRESHOLD), buf); |
| 358 | writeBinary(count_big[i], buf); |
| 359 | } |
| 360 | } |
| 361 | |
| 362 | /// Symbolizes end of data. |
| 363 | writeBinary(UInt16(BIG_THRESHOLD), buf); |
| 364 | } |
| 365 | } |
| 366 | |
| 367 | void deserialize(ReadBuffer & buf) |
| 368 | { |
| 369 | readBinary(count, buf); |
| 370 | |
| 371 | if (count * 2 > SMALL_THRESHOLD + BIG_SIZE) |
| 372 | { |
| 373 | buf.readStrict(reinterpret_cast<char *>(this) + sizeof(count), SIZE_OF_LARGE_WITHOUT_COUNT); |
| 374 | } |
| 375 | else |
| 376 | { |
| 377 | while (true) |
| 378 | { |
| 379 | UInt16 index = 0; |
| 380 | readBinary(index, buf); |
| 381 | if (index == BIG_THRESHOLD) |
| 382 | break; |
| 383 | |
| 384 | UInt64 elem_count = 0; |
| 385 | readBinary(elem_count, buf); |
| 386 | |
| 387 | if (index < SMALL_THRESHOLD) |
| 388 | count_small[index] = elem_count; |
| 389 | else |
| 390 | count_big[index - SMALL_THRESHOLD] = elem_count; |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | |
| 396 | /// Get the value of the `level` quantile. The level must be between 0 and 1. |
| 397 | UInt16 get(double level) const |
| 398 | { |
| 399 | UInt64 pos = std::ceil(count * level); |
| 400 | |
| 401 | UInt64 accumulated = 0; |
| 402 | Iterator it(*this); |
| 403 | |
| 404 | while (it.isValid()) |
| 405 | { |
| 406 | accumulated += it.count(); |
| 407 | |
| 408 | if (accumulated >= pos) |
| 409 | break; |
| 410 | |
| 411 | it.next(); |
| 412 | } |
| 413 | |
| 414 | return it.isValid() ? it.key() : BIG_THRESHOLD; |
| 415 | } |
| 416 | |
| 417 | /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address. |
| 418 | /// indices - an array of index levels such that the corresponding elements will go in ascending order. |
| 419 | template <typename ResultType> |
| 420 | void getMany(const double * levels, const size_t * indices, size_t size, ResultType * result) const |
| 421 | { |
| 422 | const auto indices_end = indices + size; |
| 423 | auto index = indices; |
| 424 | |
| 425 | UInt64 pos = std::ceil(count * levels[*index]); |
| 426 | |
| 427 | UInt64 accumulated = 0; |
| 428 | Iterator it(*this); |
| 429 | |
| 430 | while (it.isValid()) |
| 431 | { |
| 432 | accumulated += it.count(); |
| 433 | |
| 434 | while (accumulated >= pos) |
| 435 | { |
| 436 | result[*index] = it.key(); |
| 437 | ++index; |
| 438 | |
| 439 | if (index == indices_end) |
| 440 | return; |
| 441 | |
| 442 | pos = std::ceil(count * levels[*index]); |
| 443 | } |
| 444 | |
| 445 | it.next(); |
| 446 | } |
| 447 | |
| 448 | while (index != indices_end) |
| 449 | { |
| 450 | result[*index] = std::numeric_limits<ResultType>::max() < BIG_THRESHOLD |
| 451 | ? std::numeric_limits<ResultType>::max() : BIG_THRESHOLD; |
| 452 | ++index; |
| 453 | } |
| 454 | } |
| 455 | |
| 456 | /// The same, but in the case of an empty state, NaN is returned. |
| 457 | float getFloat(double level) const |
| 458 | { |
| 459 | return count |
| 460 | ? get(level) |
| 461 | : std::numeric_limits<float>::quiet_NaN(); |
| 462 | } |
| 463 | |
| 464 | void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const |
| 465 | { |
| 466 | if (count) |
| 467 | getMany(levels, levels_permutation, size, result); |
| 468 | else |
| 469 | for (size_t i = 0; i < size; ++i) |
| 470 | result[i] = std::numeric_limits<float>::quiet_NaN(); |
| 471 | } |
| 472 | }; |
| 473 | } |
| 474 | |
| 475 | |
| 476 | /** sizeof - 64 bytes. |
| 477 | * If there are not enough of them - allocates up to 20 KB of memory in addition. |
| 478 | */ |
| 479 | template <typename> /// Unused template parameter is for AggregateFunctionQuantile. |
| 480 | class QuantileTiming : private boost::noncopyable |
| 481 | { |
| 482 | private: |
| 483 | union |
| 484 | { |
| 485 | detail::QuantileTimingTiny tiny; |
| 486 | detail::QuantileTimingMedium medium; |
| 487 | detail::QuantileTimingLarge * large; |
| 488 | }; |
| 489 | |
| 490 | enum class Kind : UInt8 |
| 491 | { |
| 492 | Tiny = 1, |
| 493 | Medium = 2, |
| 494 | Large = 3 |
| 495 | }; |
| 496 | |
| 497 | Kind which() const |
| 498 | { |
| 499 | if (tiny.count <= TINY_MAX_ELEMS) |
| 500 | return Kind::Tiny; |
| 501 | if (tiny.count == TINY_MAX_ELEMS + 1) |
| 502 | return Kind::Medium; |
| 503 | return Kind::Large; |
| 504 | } |
| 505 | |
| 506 | void tinyToMedium() |
| 507 | { |
| 508 | detail::QuantileTimingTiny tiny_copy = tiny; |
| 509 | new (&medium) detail::QuantileTimingMedium(tiny_copy.elems, tiny_copy.elems + tiny_copy.count); |
| 510 | tiny.count = TINY_MAX_ELEMS + 1; |
| 511 | } |
| 512 | |
| 513 | void mediumToLarge() |
| 514 | { |
| 515 | /// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data). |
| 516 | detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge; |
| 517 | |
| 518 | for (const auto & elem : medium.elems) |
| 519 | tmp_large->insert(elem); /// Cannot throw, so don't worry about new. |
| 520 | |
| 521 | medium.~QuantileTimingMedium(); |
| 522 | large = tmp_large; |
| 523 | tiny.count = TINY_MAX_ELEMS + 2; /// large will be deleted in destructor. |
| 524 | } |
| 525 | |
| 526 | void tinyToLarge() |
| 527 | { |
| 528 | /// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data). |
| 529 | detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge; |
| 530 | |
| 531 | for (size_t i = 0; i < tiny.count; ++i) |
| 532 | tmp_large->insert(tiny.elems[i]); /// Cannot throw, so don't worry about new. |
| 533 | |
| 534 | large = tmp_large; |
| 535 | tiny.count = TINY_MAX_ELEMS + 2; /// large will be deleted in destructor. |
| 536 | } |
| 537 | |
| 538 | bool mediumIsWorthToConvertToLarge() const |
| 539 | { |
| 540 | return medium.elems.size() >= sizeof(detail::QuantileTimingLarge) / sizeof(medium.elems[0]) / 2; |
| 541 | } |
| 542 | |
| 543 | public: |
| 544 | QuantileTiming() |
| 545 | { |
| 546 | tiny.count = 0; |
| 547 | } |
| 548 | |
| 549 | ~QuantileTiming() |
| 550 | { |
| 551 | Kind kind = which(); |
| 552 | |
| 553 | if (kind == Kind::Medium) |
| 554 | { |
| 555 | medium.~QuantileTimingMedium(); |
| 556 | } |
| 557 | else if (kind == Kind::Large) |
| 558 | { |
| 559 | delete large; |
| 560 | } |
| 561 | } |
| 562 | |
| 563 | void add(UInt64 x) |
| 564 | { |
| 565 | if (tiny.count < TINY_MAX_ELEMS) |
| 566 | { |
| 567 | tiny.insert(x); |
| 568 | } |
| 569 | else |
| 570 | { |
| 571 | if (unlikely(tiny.count == TINY_MAX_ELEMS)) |
| 572 | tinyToMedium(); |
| 573 | |
| 574 | if (which() == Kind::Medium) |
| 575 | { |
| 576 | if (unlikely(mediumIsWorthToConvertToLarge())) |
| 577 | { |
| 578 | mediumToLarge(); |
| 579 | large->insert(x); |
| 580 | } |
| 581 | else |
| 582 | medium.insert(x); |
| 583 | } |
| 584 | else |
| 585 | large->insert(x); |
| 586 | } |
| 587 | } |
| 588 | |
| 589 | void add(UInt64 x, size_t weight) |
| 590 | { |
| 591 | /// NOTE: First condition is to avoid overflow. |
| 592 | if (weight < TINY_MAX_ELEMS && tiny.count + weight <= TINY_MAX_ELEMS) |
| 593 | { |
| 594 | for (size_t i = 0; i < weight; ++i) |
| 595 | tiny.insert(x); |
| 596 | } |
| 597 | else |
| 598 | { |
| 599 | if (unlikely(tiny.count <= TINY_MAX_ELEMS)) |
| 600 | tinyToLarge(); /// For the weighted variant we do not use `medium` - presumably, it is impractical. |
| 601 | |
| 602 | large->insertWeighted(x, weight); |
| 603 | } |
| 604 | } |
| 605 | |
| 606 | /// NOTE Too complicated. |
| 607 | void merge(const QuantileTiming & rhs) |
| 608 | { |
| 609 | if (tiny.count + rhs.tiny.count <= TINY_MAX_ELEMS) |
| 610 | { |
| 611 | tiny.merge(rhs.tiny); |
| 612 | } |
| 613 | else |
| 614 | { |
| 615 | auto kind = which(); |
| 616 | auto rhs_kind = rhs.which(); |
| 617 | |
| 618 | /// If one with which we merge has a larger data structure, then we bring the current structure to the same one. |
| 619 | if (kind == Kind::Tiny && rhs_kind == Kind::Medium) |
| 620 | { |
| 621 | tinyToMedium(); |
| 622 | kind = Kind::Medium; |
| 623 | } |
| 624 | else if (kind == Kind::Tiny && rhs_kind == Kind::Large) |
| 625 | { |
| 626 | tinyToLarge(); |
| 627 | kind = Kind::Large; |
| 628 | } |
| 629 | else if (kind == Kind::Medium && rhs_kind == Kind::Large) |
| 630 | { |
| 631 | mediumToLarge(); |
| 632 | kind = Kind::Large; |
| 633 | } |
| 634 | /// Case when two states are small, but when merged, they will turn into average. |
| 635 | else if (kind == Kind::Tiny && rhs_kind == Kind::Tiny) |
| 636 | { |
| 637 | tinyToMedium(); |
| 638 | kind = Kind::Medium; |
| 639 | } |
| 640 | |
| 641 | if (kind == Kind::Medium && rhs_kind == Kind::Medium) |
| 642 | { |
| 643 | medium.merge(rhs.medium); |
| 644 | } |
| 645 | else if (kind == Kind::Large && rhs_kind == Kind::Large) |
| 646 | { |
| 647 | large->merge(*rhs.large); |
| 648 | } |
| 649 | else if (kind == Kind::Medium && rhs_kind == Kind::Tiny) |
| 650 | { |
| 651 | medium.elems.insert(rhs.tiny.elems, rhs.tiny.elems + rhs.tiny.count); |
| 652 | } |
| 653 | else if (kind == Kind::Large && rhs_kind == Kind::Tiny) |
| 654 | { |
| 655 | for (size_t i = 0; i < rhs.tiny.count; ++i) |
| 656 | large->insert(rhs.tiny.elems[i]); |
| 657 | } |
| 658 | else if (kind == Kind::Large && rhs_kind == Kind::Medium) |
| 659 | { |
| 660 | for (const auto & elem : rhs.medium.elems) |
| 661 | large->insert(elem); |
| 662 | } |
| 663 | else |
| 664 | throw Exception("Logical error in QuantileTiming::merge function: not all cases are covered" , ErrorCodes::LOGICAL_ERROR); |
| 665 | |
| 666 | /// For determinism, we should always convert to `large` when size condition is reached |
| 667 | /// - regardless of merge order. |
| 668 | if (kind == Kind::Medium && unlikely(mediumIsWorthToConvertToLarge())) |
| 669 | { |
| 670 | mediumToLarge(); |
| 671 | } |
| 672 | } |
| 673 | } |
| 674 | |
| 675 | void serialize(WriteBuffer & buf) const |
| 676 | { |
| 677 | auto kind = which(); |
| 678 | DB::writePODBinary(kind, buf); |
| 679 | |
| 680 | if (kind == Kind::Tiny) |
| 681 | tiny.serialize(buf); |
| 682 | else if (kind == Kind::Medium) |
| 683 | medium.serialize(buf); |
| 684 | else |
| 685 | large->serialize(buf); |
| 686 | } |
| 687 | |
| 688 | /// Called for an empty object. |
| 689 | void deserialize(ReadBuffer & buf) |
| 690 | { |
| 691 | Kind kind; |
| 692 | DB::readPODBinary(kind, buf); |
| 693 | |
| 694 | if (kind == Kind::Tiny) |
| 695 | { |
| 696 | tiny.deserialize(buf); |
| 697 | } |
| 698 | else if (kind == Kind::Medium) |
| 699 | { |
| 700 | tinyToMedium(); |
| 701 | medium.deserialize(buf); |
| 702 | } |
| 703 | else if (kind == Kind::Large) |
| 704 | { |
| 705 | tinyToLarge(); |
| 706 | large->deserialize(buf); |
| 707 | } |
| 708 | } |
| 709 | |
| 710 | /// Get the value of the `level` quantile. The level must be between 0 and 1. |
| 711 | UInt16 get(double level) const |
| 712 | { |
| 713 | Kind kind = which(); |
| 714 | |
| 715 | if (kind == Kind::Tiny) |
| 716 | { |
| 717 | tiny.prepare(); |
| 718 | return tiny.get(level); |
| 719 | } |
| 720 | else if (kind == Kind::Medium) |
| 721 | { |
| 722 | return medium.get(level); |
| 723 | } |
| 724 | else |
| 725 | { |
| 726 | return large->get(level); |
| 727 | } |
| 728 | } |
| 729 | |
| 730 | /// Get the size values of the quantiles of the `levels` levels. Record `size` results starting with `result` address. |
| 731 | template <typename ResultType> |
| 732 | void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const |
| 733 | { |
| 734 | Kind kind = which(); |
| 735 | |
| 736 | if (kind == Kind::Tiny) |
| 737 | { |
| 738 | tiny.prepare(); |
| 739 | tiny.getMany(levels, size, result); |
| 740 | } |
| 741 | else if (kind == Kind::Medium) |
| 742 | { |
| 743 | medium.getMany(levels, levels_permutation, size, result); |
| 744 | } |
| 745 | else /*if (kind == Kind::Large)*/ |
| 746 | { |
| 747 | large->getMany(levels, levels_permutation, size, result); |
| 748 | } |
| 749 | } |
| 750 | |
| 751 | /// The same, but in the case of an empty state, NaN is returned. |
| 752 | float getFloat(double level) const |
| 753 | { |
| 754 | return tiny.count |
| 755 | ? get(level) |
| 756 | : std::numeric_limits<float>::quiet_NaN(); |
| 757 | } |
| 758 | |
| 759 | void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const |
| 760 | { |
| 761 | if (tiny.count) |
| 762 | getMany(levels, levels_permutation, size, result); |
| 763 | else |
| 764 | for (size_t i = 0; i < size; ++i) |
| 765 | result[i] = std::numeric_limits<float>::quiet_NaN(); |
| 766 | } |
| 767 | }; |
| 768 | |
| 769 | #undef SMALL_THRESHOLD |
| 770 | #undef BIG_THRESHOLD |
| 771 | #undef BIG_SIZE |
| 772 | #undef BIG_PRECISION |
| 773 | #undef TINY_MAX_ELEMS |
| 774 | |
| 775 | } |
| 776 | |