1 | #pragma once |
2 | |
3 | #include <Common/FieldVisitors.h> |
4 | #include <Common/typeid_cast.h> |
5 | #include <Common/assert_cast.h> |
6 | |
7 | #include <AggregateFunctions/IAggregateFunction.h> |
8 | #include <AggregateFunctions/UniqVariadicHash.h> |
9 | |
10 | #include <DataTypes/DataTypesNumber.h> |
11 | #include <DataTypes/DataTypeTuple.h> |
12 | #include <DataTypes/DataTypeUUID.h> |
13 | |
14 | #include <Columns/ColumnsNumber.h> |
15 | |
16 | #include <IO/ReadHelpers.h> |
17 | #include <IO/WriteHelpers.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | |
24 | /** Counts the number of unique values up to no more than specified in the parameter. |
25 | * |
26 | * Example: uniqUpTo(3)(UserID) |
27 | * - will count the number of unique visitors, return 1, 2, 3 or 4 if visitors > = 4. |
28 | * |
29 | * For strings, a non-cryptographic hash function is used, due to which the calculation may be a bit inaccurate. |
30 | */ |
31 | |
32 | template <typename T> |
33 | struct __attribute__((__packed__)) AggregateFunctionUniqUpToData |
34 | { |
35 | /** If count == threshold + 1 - this means that it is "overflowed" (values greater than threshold). |
36 | * In this case (for example, after calling the merge function), the `data` array does not necessarily contain the initialized values |
37 | * - example: combine a state in which there are few values, with another state that has overflowed; |
38 | * then set count to `threshold + 1`, and values from another state are not copied. |
39 | */ |
40 | UInt8 count = 0; |
41 | |
42 | T data[0]; |
43 | |
44 | |
45 | size_t size() const |
46 | { |
47 | return count; |
48 | } |
49 | |
50 | /// threshold - for how many elements there is room in a `data`. |
51 | /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function |
52 | void ALWAYS_INLINE insert(T x, UInt8 threshold) |
53 | { |
54 | /// The state is already full - nothing needs to be done. |
55 | if (count > threshold) |
56 | return; |
57 | |
58 | /// Linear search for the matching element. |
59 | for (size_t i = 0; i < count; ++i) |
60 | if (data[i] == x) |
61 | return; |
62 | |
63 | /// Did not find the matching element. If there is room for one more element, insert it. |
64 | if (count < threshold) |
65 | data[count] = x; |
66 | |
67 | /// After increasing count, the state may be overflowed. |
68 | ++count; |
69 | } |
70 | |
71 | void merge(const AggregateFunctionUniqUpToData<T> & rhs, UInt8 threshold) |
72 | { |
73 | if (count > threshold) |
74 | return; |
75 | |
76 | if (rhs.count > threshold) |
77 | { |
78 | /// If `rhs` is overflowed, then set `count` too also overflowed for the current state. |
79 | count = rhs.count; |
80 | return; |
81 | } |
82 | |
83 | for (size_t i = 0; i < rhs.count; ++i) |
84 | insert(rhs.data[i], threshold); |
85 | } |
86 | |
87 | void write(WriteBuffer & wb, UInt8 threshold) const |
88 | { |
89 | writeBinary(count, wb); |
90 | |
91 | /// Write values only if the state is not overflowed. Otherwise, they are not needed, and only the fact that the state is overflowed is important. |
92 | if (count <= threshold) |
93 | wb.write(reinterpret_cast<const char *>(data), count * sizeof(data[0])); |
94 | } |
95 | |
96 | void read(ReadBuffer & rb, UInt8 threshold) |
97 | { |
98 | readBinary(count, rb); |
99 | |
100 | if (count <= threshold) |
101 | rb.read(reinterpret_cast<char *>(data), count * sizeof(data[0])); |
102 | } |
103 | |
104 | /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function |
105 | void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold) |
106 | { |
107 | insert(assert_cast<const ColumnVector<T> &>(column).getData()[row_num], threshold); |
108 | } |
109 | }; |
110 | |
111 | |
112 | /// For strings, their hashes are remembered. |
113 | template <> |
114 | struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UInt64> |
115 | { |
116 | /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function |
117 | void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold) |
118 | { |
119 | /// Keep in mind that calculations are approximate. |
120 | StringRef value = column.getDataAt(row_num); |
121 | insert(CityHash_v1_0_2::CityHash64(value.data, value.size), threshold); |
122 | } |
123 | }; |
124 | |
125 | template <> |
126 | struct AggregateFunctionUniqUpToData<UInt128> : AggregateFunctionUniqUpToData<UInt64> |
127 | { |
128 | /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function |
129 | void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold) |
130 | { |
131 | UInt128 value = assert_cast<const ColumnVector<UInt128> &>(column).getData()[row_num]; |
132 | insert(sipHash64(value), threshold); |
133 | } |
134 | }; |
135 | |
136 | |
137 | template <typename T> |
138 | class AggregateFunctionUniqUpTo final : public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T>> |
139 | { |
140 | private: |
141 | UInt8 threshold; |
142 | |
143 | public: |
144 | AggregateFunctionUniqUpTo(UInt8 threshold_, const DataTypes & argument_types_, const Array & params_) |
145 | : IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T>>(argument_types_, params_) |
146 | , threshold(threshold_) |
147 | { |
148 | } |
149 | |
150 | size_t sizeOfData() const override |
151 | { |
152 | return sizeof(AggregateFunctionUniqUpToData<T>) + sizeof(T) * threshold; |
153 | } |
154 | |
155 | String getName() const override { return "uniqUpTo" ; } |
156 | |
157 | DataTypePtr getReturnType() const override |
158 | { |
159 | return std::make_shared<DataTypeUInt64>(); |
160 | } |
161 | |
162 | /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function |
163 | void ALWAYS_INLINE add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override |
164 | { |
165 | this->data(place).add(*columns[0], row_num, threshold); |
166 | } |
167 | |
168 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override |
169 | { |
170 | this->data(place).merge(this->data(rhs), threshold); |
171 | } |
172 | |
173 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override |
174 | { |
175 | this->data(place).write(buf, threshold); |
176 | } |
177 | |
178 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override |
179 | { |
180 | this->data(place).read(buf, threshold); |
181 | } |
182 | |
183 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
184 | { |
185 | assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size()); |
186 | } |
187 | }; |
188 | |
189 | |
190 | /** For multiple arguments. To compute, hashes them. |
191 | * You can pass multiple arguments as is; You can also pass one argument - a tuple. |
192 | * But (for the possibility of effective implementation), you can not pass several arguments, among which there are tuples. |
193 | */ |
194 | template <bool is_exact, bool argument_is_tuple> |
195 | class AggregateFunctionUniqUpToVariadic final |
196 | : public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<is_exact, argument_is_tuple>> |
197 | { |
198 | private: |
199 | size_t num_args = 0; |
200 | UInt8 threshold; |
201 | |
202 | public: |
203 | AggregateFunctionUniqUpToVariadic(const DataTypes & arguments, const Array & params, UInt8 threshold_) |
204 | : IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<is_exact, argument_is_tuple>>(arguments, params) |
205 | , threshold(threshold_) |
206 | { |
207 | if (argument_is_tuple) |
208 | num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size(); |
209 | else |
210 | num_args = arguments.size(); |
211 | } |
212 | |
213 | size_t sizeOfData() const override |
214 | { |
215 | return sizeof(AggregateFunctionUniqUpToData<UInt64>) + sizeof(UInt64) * threshold; |
216 | } |
217 | |
218 | String getName() const override { return "uniqUpTo" ; } |
219 | |
220 | DataTypePtr getReturnType() const override |
221 | { |
222 | return std::make_shared<DataTypeUInt64>(); |
223 | } |
224 | |
225 | void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override |
226 | { |
227 | this->data(place).insert(UInt64(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)), threshold); |
228 | } |
229 | |
230 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override |
231 | { |
232 | this->data(place).merge(this->data(rhs), threshold); |
233 | } |
234 | |
235 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override |
236 | { |
237 | this->data(place).write(buf, threshold); |
238 | } |
239 | |
240 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override |
241 | { |
242 | this->data(place).read(buf, threshold); |
243 | } |
244 | |
245 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
246 | { |
247 | assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size()); |
248 | } |
249 | }; |
250 | |
251 | |
252 | } |
253 | |