1 | #pragma once |
2 | |
3 | #include <Common/CombinedCardinalityEstimator.h> |
4 | #include <Common/FieldVisitors.h> |
5 | #include <Common/SipHash.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <Common/assert_cast.h> |
8 | |
9 | #include <DataTypes/DataTypeTuple.h> |
10 | #include <DataTypes/DataTypeUUID.h> |
11 | #include <DataTypes/DataTypesNumber.h> |
12 | |
13 | #include <AggregateFunctions/IAggregateFunction.h> |
14 | #include <AggregateFunctions/UniqCombinedBiasData.h> |
15 | #include <AggregateFunctions/UniqVariadicHash.h> |
16 | |
17 | #include <ext/bit_cast.h> |
18 | |
19 | #include <Columns/ColumnVector.h> |
20 | #include <Columns/ColumnsNumber.h> |
21 | |
22 | |
23 | namespace DB |
24 | { |
25 | namespace detail |
26 | { |
27 | /** Hash function for uniqCombined/uniqCombined64 (based on Ret). |
28 | */ |
29 | template <typename T, typename Ret> |
30 | struct AggregateFunctionUniqCombinedTraits |
31 | { |
32 | static Ret hash(T x) |
33 | { |
34 | return static_cast<Ret>(intHash64(x)); |
35 | } |
36 | }; |
37 | |
38 | template <typename Ret> |
39 | struct AggregateFunctionUniqCombinedTraits<UInt128, Ret> |
40 | { |
41 | static Ret hash(UInt128 x) |
42 | { |
43 | return sipHash64(x); |
44 | } |
45 | }; |
46 | |
47 | template <typename Ret> |
48 | struct AggregateFunctionUniqCombinedTraits<Float32, Ret> |
49 | { |
50 | static Ret hash(Float32 x) |
51 | { |
52 | UInt64 res = ext::bit_cast<UInt64>(x); |
53 | return static_cast<Ret>(intHash64(res)); |
54 | } |
55 | }; |
56 | |
57 | template <typename Ret> |
58 | struct AggregateFunctionUniqCombinedTraits<Float64, Ret> |
59 | { |
60 | static Ret hash(Float64 x) |
61 | { |
62 | UInt64 res = ext::bit_cast<UInt64>(x); |
63 | return static_cast<Ret>(intHash64(res)); |
64 | } |
65 | }; |
66 | |
67 | } |
68 | |
69 | // Unlike HashTableGrower always grows to power of 2. |
70 | struct UniqCombinedHashTableGrower : public HashTableGrower<> |
71 | { |
72 | void increaseSize() { ++size_degree; } |
73 | }; |
74 | |
75 | template <typename Key, UInt8 K> |
76 | struct AggregateFunctionUniqCombinedDataWithKey |
77 | { |
78 | // TODO(ilezhankin): pre-generate values for |UniqCombinedBiasData|, |
79 | // at the moment gen-bias-data.py script doesn't work. |
80 | |
81 | // We want to migrate from |HashSet| to |HyperLogLogCounter| when the sizes in memory become almost equal. |
82 | // The size per element in |HashSet| is sizeof(Key)*2 bytes, and the overall size of |HyperLogLogCounter| is 2^K * 6 bits. |
83 | // For Key=UInt32 we can calculate: 2^X * 4 * 2 ≤ 2^(K-3) * 6 ⇒ X ≤ K-4. |
84 | using Set = CombinedCardinalityEstimator<Key, HashSet<Key, TrivialHash, UniqCombinedHashTableGrower>, 16, K - 5 + (sizeof(Key) == sizeof(UInt32)), K, TrivialHash, Key>; |
85 | |
86 | Set set; |
87 | }; |
88 | |
89 | template <typename Key> |
90 | struct AggregateFunctionUniqCombinedDataWithKey<Key, 17> |
91 | { |
92 | using Set = CombinedCardinalityEstimator<Key, |
93 | HashSet<Key, TrivialHash, UniqCombinedHashTableGrower>, |
94 | 16, |
95 | 12 + (sizeof(Key) == sizeof(UInt32)), |
96 | 17, |
97 | TrivialHash, |
98 | Key, |
99 | HyperLogLogBiasEstimator<UniqCombinedBiasData>, |
100 | HyperLogLogMode::FullFeatured>; |
101 | |
102 | Set set; |
103 | }; |
104 | |
105 | |
106 | template <typename T, UInt8 K, typename HashValueType> |
107 | struct AggregateFunctionUniqCombinedData : public AggregateFunctionUniqCombinedDataWithKey<HashValueType, K> |
108 | { |
109 | }; |
110 | |
111 | |
112 | /// For String keys, 64 bit hash is always used (both for uniqCombined and uniqCombined64), |
113 | /// because of backwards compatibility (64 bit hash was already used for uniqCombined). |
114 | template <UInt8 K, typename HashValueType> |
115 | struct AggregateFunctionUniqCombinedData<String, K, HashValueType> : public AggregateFunctionUniqCombinedDataWithKey<UInt64 /*always*/, K> |
116 | { |
117 | }; |
118 | |
119 | |
120 | template <typename T, UInt8 K, typename HashValueType> |
121 | class AggregateFunctionUniqCombined final |
122 | : public IAggregateFunctionDataHelper<AggregateFunctionUniqCombinedData<T, K, HashValueType>, AggregateFunctionUniqCombined<T, K, HashValueType>> |
123 | { |
124 | public: |
125 | AggregateFunctionUniqCombined(const DataTypes & argument_types_, const Array & params_) |
126 | : IAggregateFunctionDataHelper<AggregateFunctionUniqCombinedData<T, K, HashValueType>, AggregateFunctionUniqCombined<T, K, HashValueType>>(argument_types_, params_) {} |
127 | |
128 | String getName() const override |
129 | { |
130 | if constexpr (std::is_same_v<HashValueType, UInt64>) |
131 | return "uniqCombined64" ; |
132 | else |
133 | return "uniqCombined" ; |
134 | } |
135 | |
136 | DataTypePtr getReturnType() const override |
137 | { |
138 | return std::make_shared<DataTypeUInt64>(); |
139 | } |
140 | |
141 | void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override |
142 | { |
143 | if constexpr (!std::is_same_v<T, String>) |
144 | { |
145 | const auto & value = assert_cast<const ColumnVector<T> &>(*columns[0]).getElement(row_num); |
146 | this->data(place).set.insert(detail::AggregateFunctionUniqCombinedTraits<T, HashValueType>::hash(value)); |
147 | } |
148 | else |
149 | { |
150 | StringRef value = columns[0]->getDataAt(row_num); |
151 | this->data(place).set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size)); |
152 | } |
153 | } |
154 | |
155 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override |
156 | { |
157 | this->data(place).set.merge(this->data(rhs).set); |
158 | } |
159 | |
160 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override |
161 | { |
162 | this->data(place).set.write(buf); |
163 | } |
164 | |
165 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override |
166 | { |
167 | this->data(place).set.read(buf); |
168 | } |
169 | |
170 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
171 | { |
172 | assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size()); |
173 | } |
174 | }; |
175 | |
176 | /** For multiple arguments. To compute, hashes them. |
177 | * You can pass multiple arguments as is; You can also pass one argument - a tuple. |
178 | * But (for the possibility of efficient implementation), you can not pass several arguments, among which there are tuples. |
179 | */ |
180 | template <bool is_exact, bool argument_is_tuple, UInt8 K, typename HashValueType> |
181 | class AggregateFunctionUniqCombinedVariadic final : public IAggregateFunctionDataHelper<AggregateFunctionUniqCombinedData<UInt64, K, HashValueType>, |
182 | AggregateFunctionUniqCombinedVariadic<is_exact, argument_is_tuple, K, HashValueType>> |
183 | { |
184 | private: |
185 | size_t num_args = 0; |
186 | |
187 | public: |
188 | explicit AggregateFunctionUniqCombinedVariadic(const DataTypes & arguments, const Array & params) |
189 | : IAggregateFunctionDataHelper<AggregateFunctionUniqCombinedData<UInt64, K, HashValueType>, |
190 | AggregateFunctionUniqCombinedVariadic<is_exact, argument_is_tuple, K, HashValueType>>(arguments, params) |
191 | { |
192 | if (argument_is_tuple) |
193 | num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size(); |
194 | else |
195 | num_args = arguments.size(); |
196 | } |
197 | |
198 | String getName() const override |
199 | { |
200 | if constexpr (std::is_same_v<HashValueType, UInt64>) |
201 | return "uniqCombined64" ; |
202 | else |
203 | return "uniqCombined" ; |
204 | } |
205 | |
206 | DataTypePtr getReturnType() const override |
207 | { |
208 | return std::make_shared<DataTypeUInt64>(); |
209 | } |
210 | |
211 | void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override |
212 | { |
213 | this->data(place).set.insert(typename AggregateFunctionUniqCombinedData<UInt64, K, HashValueType>::Set::value_type( |
214 | UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num))); |
215 | } |
216 | |
217 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override |
218 | { |
219 | this->data(place).set.merge(this->data(rhs).set); |
220 | } |
221 | |
222 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override |
223 | { |
224 | this->data(place).set.write(buf); |
225 | } |
226 | |
227 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override |
228 | { |
229 | this->data(place).set.read(buf); |
230 | } |
231 | |
232 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
233 | { |
234 | assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size()); |
235 | } |
236 | }; |
237 | |
238 | } |
239 | |