1#pragma once
2
3#include <boost/noncopyable.hpp>
4
5#include <Common/HyperLogLogCounter.h>
6#include <Common/HashTable/SmallTable.h>
7
8
9namespace DB
10{
11
12
13/** For a small number of keys - an array of fixed size "on the stack".
14 * For large, HyperLogLog is allocated.
15 * See also the more practical implementation in CombinedCardinalityEstimator.h,
16 * where a hash table is also used for medium-sized sets.
17 */
18template
19<
20 typename Key,
21 UInt8 small_set_size,
22 UInt8 K,
23 typename Hash = IntHash32<Key>,
24 typename DenominatorType = double>
25class HyperLogLogWithSmallSetOptimization : private boost::noncopyable
26{
27private:
28 using Small = SmallSet<Key, small_set_size>;
29 using Large = HyperLogLogCounter<K, Hash, UInt32, DenominatorType>;
30
31 Small small;
32 Large * large = nullptr;
33
34 bool isLarge() const
35 {
36 return large != nullptr;
37 }
38
39 void toLarge()
40 {
41 /// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data).
42 Large * tmp_large = new Large;
43
44 for (const auto & x : small)
45 tmp_large->insert(x.getValue());
46
47 large = tmp_large;
48 }
49
50public:
51 using value_type = Key;
52
53 ~HyperLogLogWithSmallSetOptimization()
54 {
55 if (isLarge())
56 delete large;
57 }
58
59 /// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
60 void ALWAYS_INLINE insert(Key value)
61 {
62 if (!isLarge())
63 {
64 if (small.find(value) == small.end())
65 {
66 if (!small.full())
67 small.insert(value);
68 else
69 {
70 toLarge();
71 large->insert(value);
72 }
73 }
74 }
75 else
76 large->insert(value);
77 }
78
79 UInt64 size() const
80 {
81 return !isLarge() ? small.size() : large->size();
82 }
83
84 void merge(const HyperLogLogWithSmallSetOptimization & rhs)
85 {
86 if (rhs.isLarge())
87 {
88 if (!isLarge())
89 toLarge();
90
91 large->merge(*rhs.large);
92 }
93 else
94 {
95 for (const auto & x : rhs.small)
96 insert(x.getValue());
97 }
98 }
99
100 /// You can only call for an empty object.
101 void read(DB::ReadBuffer & in)
102 {
103 bool is_large;
104 readBinary(is_large, in);
105
106 if (is_large)
107 {
108 toLarge();
109 large->read(in);
110 }
111 else
112 small.read(in);
113 }
114
115 void readAndMerge(DB::ReadBuffer & in)
116 {
117 bool is_rhs_large;
118 readBinary(is_rhs_large, in);
119
120 if (!isLarge() && is_rhs_large)
121 toLarge();
122
123 if (!is_rhs_large)
124 {
125 typename Small::Reader reader(in);
126 while (reader.next())
127 insert(reader.get());
128 }
129 else
130 large->readAndMerge(in);
131 }
132
133 void write(DB::WriteBuffer & out) const
134 {
135 writeBinary(isLarge(), out);
136
137 if (isLarge())
138 large->write(out);
139 else
140 small.write(out);
141 }
142};
143
144
145}
146