1#pragma once
2
3#include <common/Types.h>
4#include <Common/HyperLogLogBiasEstimator.h>
5#include <Common/CompactArray.h>
6#include <Common/HashTable/Hash.h>
7
8#include <IO/ReadBuffer.h>
9#include <IO/WriteBuffer.h>
10#include <IO/ReadHelpers.h>
11#include <IO/WriteHelpers.h>
12#include <Core/Defines.h>
13
14#include <cmath>
15#include <cstring>
16
17
18namespace DB
19{
20namespace ErrorCodes
21{
22 extern const int LOGICAL_ERROR;
23}
24}
25
26
27/// Sets denominator type.
28enum class DenominatorMode
29{
30 Compact, /// Compact denominator.
31 StableIfBig, /// Stable denominator falling back to Compact if rank storage is not big enough.
32 ExactType /// Denominator of specified exact type.
33};
34
35namespace details
36{
37
38/// Look-up table of logarithms for integer numbers, used in HyperLogLogCounter.
39template <UInt8 K>
40struct LogLUT
41{
42 LogLUT()
43 {
44 log_table[0] = 0.0;
45 for (size_t i = 1; i <= M; ++i)
46 log_table[i] = log(static_cast<double>(i));
47 }
48
49 double getLog(size_t x) const
50 {
51 if (x <= M)
52 return log_table[x];
53 else
54 return log(static_cast<double>(x));
55 }
56
57private:
58 static constexpr size_t M = 1 << ((static_cast<unsigned int>(K) <= 12) ? K : 12);
59
60 double log_table[M + 1];
61};
62
63template <UInt8 K> struct MinCounterTypeHelper;
64template <> struct MinCounterTypeHelper<0> { using Type = UInt8; };
65template <> struct MinCounterTypeHelper<1> { using Type = UInt16; };
66template <> struct MinCounterTypeHelper<2> { using Type = UInt32; };
67template <> struct MinCounterTypeHelper<3> { using Type = UInt64; };
68
69/// Auxiliary structure for automatic determining minimum size of counter's type depending on its maximum value.
70/// Used in HyperLogLogCounter in order to spend memory efficiently.
71template <UInt64 MaxValue> struct MinCounterType
72{
73 using Type = typename MinCounterTypeHelper<
74 (MaxValue >= 1 << 8) +
75 (MaxValue >= 1 << 16) +
76 (MaxValue >= 1ULL << 32)
77 >::Type;
78};
79
80/// Denominator of expression for HyperLogLog algorithm.
81template <UInt8 precision, int max_rank, typename HashValueType, typename DenominatorType,
82 DenominatorMode denominator_mode, typename Enable = void>
83class __attribute__ ((packed)) Denominator;
84
85namespace
86{
87
88/// Returns true if rank storage is big.
89constexpr bool isBigRankStore(UInt8 precision)
90{
91 return precision >= 12;
92}
93
94}
95
96/// Used to deduce denominator type depending on options provided.
97template <typename HashValueType, typename DenominatorType, DenominatorMode denominator_mode, typename Enable = void>
98struct IntermediateDenominator;
99
100template <typename DenominatorType, DenominatorMode denominator_mode>
101struct IntermediateDenominator<UInt32, DenominatorType, denominator_mode, std::enable_if_t<denominator_mode != DenominatorMode::ExactType>>
102{
103 using Type = double;
104};
105
106template <typename DenominatorType, DenominatorMode denominator_mode>
107struct IntermediateDenominator<UInt64, DenominatorType, denominator_mode>
108{
109 using Type = long double;
110};
111
112template <typename HashValueType, typename DenominatorType>
113struct IntermediateDenominator<HashValueType, DenominatorType, DenominatorMode::ExactType>
114{
115 using Type = DenominatorType;
116};
117
118/// "Lightweight" implementation of expression's denominator for HyperLogLog algorithm.
119/// Uses minimum amount of memory, but estimates may be unstable.
120/// Satisfiable when rank storage is small enough.
121template <UInt8 precision, int max_rank, typename HashValueType, typename DenominatorType,
122 DenominatorMode denominator_mode>
123class __attribute__ ((packed)) Denominator<precision, max_rank, HashValueType, DenominatorType,
124 denominator_mode,
125 std::enable_if_t<!details::isBigRankStore(precision) || !(denominator_mode == DenominatorMode::StableIfBig)>>
126{
127private:
128 using T = typename IntermediateDenominator<HashValueType, DenominatorType, denominator_mode>::Type;
129
130public:
131 Denominator(DenominatorType initial_value)
132 : denominator(initial_value)
133 {
134 }
135
136public:
137 inline void update(UInt8 cur_rank, UInt8 new_rank)
138 {
139 denominator -= static_cast<T>(1.0) / (1ULL << cur_rank);
140 denominator += static_cast<T>(1.0) / (1ULL << new_rank);
141 }
142
143 inline void update(UInt8 rank)
144 {
145 denominator += static_cast<T>(1.0) / (1ULL << rank);
146 }
147
148 void clear()
149 {
150 denominator = 0;
151 }
152
153 DenominatorType get() const
154 {
155 return denominator;
156 }
157
158private:
159 T denominator;
160};
161
162/// Fully-functional version of expression's denominator for HyperLogLog algorithm.
163/// Spends more space that lightweight version. Estimates will always be stable.
164/// Used when rank storage is big.
165template <UInt8 precision, int max_rank, typename HashValueType, typename DenominatorType,
166 DenominatorMode denominator_mode>
167class __attribute__ ((packed)) Denominator<precision, max_rank, HashValueType, DenominatorType,
168 denominator_mode,
169 std::enable_if_t<details::isBigRankStore(precision) && denominator_mode == DenominatorMode::StableIfBig>>
170{
171public:
172 Denominator(DenominatorType initial_value)
173 {
174 rank_count[0] = initial_value;
175 }
176
177 inline void update(UInt8 cur_rank, UInt8 new_rank)
178 {
179 --rank_count[cur_rank];
180 ++rank_count[new_rank];
181 }
182
183 inline void update(UInt8 rank)
184 {
185 ++rank_count[rank];
186 }
187
188 void clear()
189 {
190 memset(rank_count, 0, size * sizeof(UInt32));
191 }
192
193 DenominatorType get() const
194 {
195 long double val = rank_count[size - 1];
196 for (int i = size - 2; i >= 0; --i)
197 {
198 val /= 2.0;
199 val += rank_count[i];
200 }
201 return val;
202 }
203
204private:
205 static constexpr size_t size = max_rank + 1;
206 UInt32 rank_count[size] = { 0 };
207};
208
209/// Number of trailing zeros.
210template <typename T>
211struct TrailingZerosCounter;
212
213template <>
214struct TrailingZerosCounter<UInt32>
215{
216 static int apply(UInt32 val)
217 {
218 return __builtin_ctz(val);
219 }
220};
221
222template <>
223struct TrailingZerosCounter<UInt64>
224{
225 static int apply(UInt64 val)
226 {
227 return __builtin_ctzll(val);
228 }
229};
230
231/// Size of counter's rank in bits.
232template <typename T>
233struct RankWidth;
234
235template <>
236struct RankWidth<UInt32>
237{
238 static constexpr UInt8 get()
239 {
240 return 5;
241 }
242};
243
244template <>
245struct RankWidth<UInt64>
246{
247 static constexpr UInt8 get()
248 {
249 return 6;
250 }
251};
252
253}
254
255/// Sets behavior of HyperLogLog class.
256enum class HyperLogLogMode
257{
258 Raw, /// No error correction.
259 LinearCounting, /// LinearCounting error correction.
260 BiasCorrected, /// HyperLogLog++ error correction.
261 FullFeatured /// LinearCounting or HyperLogLog++ error correction (depending).
262};
263
264/// Estimation of number of unique values using HyperLogLog algorithm.
265///
266/// Theoretical relative error is ~1.04 / sqrt(2^precision), where
267/// precision is size of prefix of hash-function used for indexing (number of buckets M = 2^precision).
268/// Recommended values for precision are: 3..20.
269///
270/// Source: "HyperLogLog: The analysis of a near-optimal cardinality estimation algorithm"
271/// (P. Flajolet et al., AOFA '07: Proceedings of the 2007 International Conference on Analysis
272/// of Algorithms).
273template <
274 UInt8 precision,
275 typename Hash = IntHash32<UInt64>,
276 typename HashValueType = UInt32,
277 typename DenominatorType = double,
278 typename BiasEstimator = TrivialBiasEstimator,
279 HyperLogLogMode mode = HyperLogLogMode::FullFeatured,
280 DenominatorMode denominator_mode = DenominatorMode::StableIfBig>
281class HyperLogLogCounter : private Hash
282{
283private:
284 /// Number of buckets.
285 static constexpr size_t bucket_count = 1ULL << precision;
286
287 /// Size of counter's rank in bits.
288 static constexpr UInt8 rank_width = details::RankWidth<HashValueType>::get();
289
290 using Value = UInt64;
291 using RankStore = DB::CompactArray<HashValueType, rank_width, bucket_count>;
292
293public:
294 using value_type = Value;
295
296 /// ALWAYS_INLINE is required to have better code layout for uniqCombined function
297 void ALWAYS_INLINE insert(Value value)
298 {
299 HashValueType hash = getHash(value);
300
301 /// Divide hash to two sub-values. First is bucket number, second will be used to calculate rank.
302 HashValueType bucket = extractBitSequence(hash, 0, precision);
303 HashValueType tail = extractBitSequence(hash, precision, sizeof(HashValueType) * 8);
304 UInt8 rank = calculateRank(tail);
305
306 /// Update maximum rank for current bucket.
307 update(bucket, rank);
308 }
309
310 UInt64 size() const
311 {
312 /// Normalizing factor for harmonic mean.
313 static constexpr double alpha_m =
314 bucket_count == 2 ? 0.351 :
315 bucket_count == 4 ? 0.532 :
316 bucket_count == 8 ? 0.626 :
317 bucket_count == 16 ? 0.673 :
318 bucket_count == 32 ? 0.697 :
319 bucket_count == 64 ? 0.709 : 0.7213 / (1 + 1.079 / bucket_count);
320
321 /// Harmonic mean for all buckets of 2^rank values is: bucket_count / ∑ 2^-rank_i,
322 /// where ∑ 2^-rank_i - is denominator.
323
324 double raw_estimate = alpha_m * bucket_count * bucket_count / denominator.get();
325
326 double final_estimate = fixRawEstimate(raw_estimate);
327
328 return static_cast<UInt64>(final_estimate + 0.5);
329 }
330
331 void merge(const HyperLogLogCounter & rhs)
332 {
333 const auto & rhs_rank_store = rhs.rank_store;
334 for (HashValueType bucket = 0; bucket < bucket_count; ++bucket)
335 update(bucket, rhs_rank_store[bucket]);
336 }
337
338 void read(DB::ReadBuffer & in)
339 {
340 in.readStrict(reinterpret_cast<char *>(this), sizeof(*this));
341 }
342
343 void readAndMerge(DB::ReadBuffer & in)
344 {
345 typename RankStore::Reader reader(in);
346 while (reader.next())
347 {
348 const auto & data = reader.get();
349 update(data.first, data.second);
350 }
351
352 in.ignore(sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType));
353 }
354
355 static void skip(DB::ReadBuffer & in)
356 {
357 in.ignore(sizeof(RankStore) + sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType));
358 }
359
360 void write(DB::WriteBuffer & out) const
361 {
362 out.write(reinterpret_cast<const char *>(this), sizeof(*this));
363 }
364
365 /// Read and write in text mode is suboptimal (but compatible with OLAPServer and Metrage).
366 void readText(DB::ReadBuffer & in)
367 {
368 rank_store.readText(in);
369
370 zeros = 0;
371 denominator.clear();
372 for (HashValueType bucket = 0; bucket < bucket_count; ++bucket)
373 {
374 UInt8 rank = rank_store[bucket];
375 if (rank == 0)
376 ++zeros;
377 denominator.update(rank);
378 }
379 }
380
381 static void skipText(DB::ReadBuffer & in)
382 {
383 UInt8 dummy;
384 for (size_t i = 0; i < RankStore::size(); ++i)
385 {
386 if (i != 0)
387 DB::assertChar(',', in);
388 DB::readIntText(dummy, in);
389 }
390 }
391
392 void writeText(DB::WriteBuffer & out) const
393 {
394 rank_store.writeText(out);
395 }
396
397private:
398 /// Extract subset of bits in [begin, end[ range.
399 inline HashValueType extractBitSequence(HashValueType val, UInt8 begin, UInt8 end) const
400 {
401 return (val >> begin) & ((1ULL << (end - begin)) - 1);
402 }
403
404 /// Rank is number of trailing zeros.
405 inline UInt8 calculateRank(HashValueType val) const
406 {
407 if (unlikely(val == 0))
408 return max_rank;
409
410 auto zeros_plus_one = details::TrailingZerosCounter<HashValueType>::apply(val) + 1;
411
412 if (unlikely(zeros_plus_one) > max_rank)
413 return max_rank;
414
415 return zeros_plus_one;
416 }
417
418 inline HashValueType getHash(Value key) const
419 {
420 return Hash::operator()(key);
421 }
422
423 /// Update maximum rank for current bucket.
424 /// ALWAYS_INLINE is required to have better code layout for uniqCombined function
425 void ALWAYS_INLINE update(HashValueType bucket, UInt8 rank)
426 {
427 typename RankStore::Locus content = rank_store[bucket];
428 UInt8 cur_rank = static_cast<UInt8>(content);
429
430 if (rank > cur_rank)
431 {
432 if (cur_rank == 0)
433 --zeros;
434 denominator.update(cur_rank, rank);
435 content = rank;
436 }
437 }
438
439 double fixRawEstimate(double raw_estimate) const
440 {
441 if ((mode == HyperLogLogMode::Raw) || ((mode == HyperLogLogMode::BiasCorrected) && BiasEstimator::isTrivial()))
442 return raw_estimate;
443 else if (mode == HyperLogLogMode::LinearCounting)
444 return applyLinearCorrection(raw_estimate);
445 else if ((mode == HyperLogLogMode::BiasCorrected) && !BiasEstimator::isTrivial())
446 return applyBiasCorrection(raw_estimate);
447 else if (mode == HyperLogLogMode::FullFeatured)
448 {
449 static constexpr double pow2_32 = 4294967296.0;
450
451 double fixed_estimate;
452
453 if (raw_estimate > (pow2_32 / 30.0))
454 fixed_estimate = raw_estimate;
455 else
456 fixed_estimate = applyCorrection(raw_estimate);
457
458 return fixed_estimate;
459 }
460 else
461 throw Poco::Exception("Internal error", DB::ErrorCodes::LOGICAL_ERROR);
462 }
463
464 inline double applyCorrection(double raw_estimate) const
465 {
466 double fixed_estimate;
467
468 if (BiasEstimator::isTrivial())
469 {
470 if (raw_estimate <= (2.5 * bucket_count))
471 {
472 /// Correction in case of small estimate.
473 fixed_estimate = applyLinearCorrection(raw_estimate);
474 }
475 else
476 fixed_estimate = raw_estimate;
477 }
478 else
479 {
480 fixed_estimate = applyBiasCorrection(raw_estimate);
481 double linear_estimate = applyLinearCorrection(fixed_estimate);
482
483 if (linear_estimate < BiasEstimator::getThreshold())
484 fixed_estimate = linear_estimate;
485 }
486
487 return fixed_estimate;
488 }
489
490 /// Correction used in HyperLogLog++ algorithm.
491 /// Source: "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm"
492 /// (S. Heule et al., Proceedings of the EDBT 2013 Conference).
493 inline double applyBiasCorrection(double raw_estimate) const
494 {
495 double fixed_estimate;
496
497 if (raw_estimate <= (5 * bucket_count))
498 fixed_estimate = raw_estimate - BiasEstimator::getBias(raw_estimate);
499 else
500 fixed_estimate = raw_estimate;
501
502 return fixed_estimate;
503 }
504
505 /// Calculation of unique values using LinearCounting algorithm.
506 /// Source: "A Linear-time Probabilistic Counting Algorithm for Database Applications"
507 /// (Whang et al., ACM Trans. Database Syst., pp. 208-229, 1990).
508 inline double applyLinearCorrection(double raw_estimate) const
509 {
510 double fixed_estimate;
511
512 if (zeros != 0)
513 fixed_estimate = bucket_count * (log_lut.getLog(bucket_count) - log_lut.getLog(zeros));
514 else
515 fixed_estimate = raw_estimate;
516
517 return fixed_estimate;
518 }
519
520private:
521 static constexpr int max_rank = sizeof(HashValueType) * 8 - precision + 1;
522
523 RankStore rank_store;
524
525 /// Expression's denominator for HyperLogLog algorithm.
526 using DenominatorCalculatorType = details::Denominator<precision, max_rank, HashValueType, DenominatorType, denominator_mode>;
527 DenominatorCalculatorType denominator{bucket_count};
528
529 /// Number of zeros in rank storage.
530 using ZerosCounterType = typename details::MinCounterType<bucket_count>::Type;
531 ZerosCounterType zeros = bucket_count;
532
533 static details::LogLUT<precision> log_lut;
534
535 /// Checks.
536 static_assert(precision < (sizeof(HashValueType) * 8), "Invalid parameter value");
537};
538
539
540/// Declaration of static variables for linker.
541template
542<
543 UInt8 precision,
544 typename Hash,
545 typename HashValueType,
546 typename DenominatorType,
547 typename BiasEstimator,
548 HyperLogLogMode mode,
549 DenominatorMode denominator_mode
550>
551details::LogLUT<precision> HyperLogLogCounter
552<
553 precision,
554 Hash,
555 HashValueType,
556 DenominatorType,
557 BiasEstimator,
558 mode,
559 denominator_mode
560>::log_lut;
561
562
563/// Lightweight implementation of expression's denominator is used in Metrage.
564/// Serialization format must not be changed.
565using HLL12 = HyperLogLogCounter<
566 12,
567 IntHash32<UInt64>,
568 UInt32,
569 double,
570 TrivialBiasEstimator,
571 HyperLogLogMode::FullFeatured,
572 DenominatorMode::Compact
573>;
574