1#pragma once
2
3#include <Common/FieldVisitors.h>
4#include <Common/typeid_cast.h>
5#include <Common/assert_cast.h>
6
7#include <AggregateFunctions/IAggregateFunction.h>
8#include <AggregateFunctions/UniqVariadicHash.h>
9
10#include <DataTypes/DataTypesNumber.h>
11#include <DataTypes/DataTypeTuple.h>
12#include <DataTypes/DataTypeUUID.h>
13
14#include <Columns/ColumnsNumber.h>
15
16#include <IO/ReadHelpers.h>
17#include <IO/WriteHelpers.h>
18
19
20namespace DB
21{
22
23
24/** Counts the number of unique values up to no more than specified in the parameter.
25 *
26 * Example: uniqUpTo(3)(UserID)
27 * - will count the number of unique visitors, return 1, 2, 3 or 4 if visitors > = 4.
28 *
29 * For strings, a non-cryptographic hash function is used, due to which the calculation may be a bit inaccurate.
30 */
31
32template <typename T>
33struct __attribute__((__packed__)) AggregateFunctionUniqUpToData
34{
35/** If count == threshold + 1 - this means that it is "overflowed" (values greater than threshold).
36 * In this case (for example, after calling the merge function), the `data` array does not necessarily contain the initialized values
37 * - example: combine a state in which there are few values, with another state that has overflowed;
38 * then set count to `threshold + 1`, and values from another state are not copied.
39 */
40 UInt8 count = 0;
41
42 T data[0];
43
44
45 size_t size() const
46 {
47 return count;
48 }
49
50 /// threshold - for how many elements there is room in a `data`.
51 /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
52 void ALWAYS_INLINE insert(T x, UInt8 threshold)
53 {
54 /// The state is already full - nothing needs to be done.
55 if (count > threshold)
56 return;
57
58 /// Linear search for the matching element.
59 for (size_t i = 0; i < count; ++i)
60 if (data[i] == x)
61 return;
62
63 /// Did not find the matching element. If there is room for one more element, insert it.
64 if (count < threshold)
65 data[count] = x;
66
67 /// After increasing count, the state may be overflowed.
68 ++count;
69 }
70
71 void merge(const AggregateFunctionUniqUpToData<T> & rhs, UInt8 threshold)
72 {
73 if (count > threshold)
74 return;
75
76 if (rhs.count > threshold)
77 {
78 /// If `rhs` is overflowed, then set `count` too also overflowed for the current state.
79 count = rhs.count;
80 return;
81 }
82
83 for (size_t i = 0; i < rhs.count; ++i)
84 insert(rhs.data[i], threshold);
85 }
86
87 void write(WriteBuffer & wb, UInt8 threshold) const
88 {
89 writeBinary(count, wb);
90
91 /// Write values only if the state is not overflowed. Otherwise, they are not needed, and only the fact that the state is overflowed is important.
92 if (count <= threshold)
93 wb.write(reinterpret_cast<const char *>(data), count * sizeof(data[0]));
94 }
95
96 void read(ReadBuffer & rb, UInt8 threshold)
97 {
98 readBinary(count, rb);
99
100 if (count <= threshold)
101 rb.read(reinterpret_cast<char *>(data), count * sizeof(data[0]));
102 }
103
104 /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
105 void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
106 {
107 insert(assert_cast<const ColumnVector<T> &>(column).getData()[row_num], threshold);
108 }
109};
110
111
112/// For strings, their hashes are remembered.
113template <>
114struct AggregateFunctionUniqUpToData<String> : AggregateFunctionUniqUpToData<UInt64>
115{
116 /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
117 void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
118 {
119 /// Keep in mind that calculations are approximate.
120 StringRef value = column.getDataAt(row_num);
121 insert(CityHash_v1_0_2::CityHash64(value.data, value.size), threshold);
122 }
123};
124
125template <>
126struct AggregateFunctionUniqUpToData<UInt128> : AggregateFunctionUniqUpToData<UInt64>
127{
128 /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
129 void ALWAYS_INLINE add(const IColumn & column, size_t row_num, UInt8 threshold)
130 {
131 UInt128 value = assert_cast<const ColumnVector<UInt128> &>(column).getData()[row_num];
132 insert(sipHash64(value), threshold);
133 }
134};
135
136
137template <typename T>
138class AggregateFunctionUniqUpTo final : public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T>>
139{
140private:
141 UInt8 threshold;
142
143public:
144 AggregateFunctionUniqUpTo(UInt8 threshold_, const DataTypes & argument_types_, const Array & params_)
145 : IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<T>, AggregateFunctionUniqUpTo<T>>(argument_types_, params_)
146 , threshold(threshold_)
147 {
148 }
149
150 size_t sizeOfData() const override
151 {
152 return sizeof(AggregateFunctionUniqUpToData<T>) + sizeof(T) * threshold;
153 }
154
155 String getName() const override { return "uniqUpTo"; }
156
157 DataTypePtr getReturnType() const override
158 {
159 return std::make_shared<DataTypeUInt64>();
160 }
161
162 /// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
163 void ALWAYS_INLINE add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
164 {
165 this->data(place).add(*columns[0], row_num, threshold);
166 }
167
168 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
169 {
170 this->data(place).merge(this->data(rhs), threshold);
171 }
172
173 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
174 {
175 this->data(place).write(buf, threshold);
176 }
177
178 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
179 {
180 this->data(place).read(buf, threshold);
181 }
182
183 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
184 {
185 assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
186 }
187};
188
189
190/** For multiple arguments. To compute, hashes them.
191 * You can pass multiple arguments as is; You can also pass one argument - a tuple.
192 * But (for the possibility of effective implementation), you can not pass several arguments, among which there are tuples.
193 */
194template <bool is_exact, bool argument_is_tuple>
195class AggregateFunctionUniqUpToVariadic final
196 : public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<is_exact, argument_is_tuple>>
197{
198private:
199 size_t num_args = 0;
200 UInt8 threshold;
201
202public:
203 AggregateFunctionUniqUpToVariadic(const DataTypes & arguments, const Array & params, UInt8 threshold_)
204 : IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<is_exact, argument_is_tuple>>(arguments, params)
205 , threshold(threshold_)
206 {
207 if (argument_is_tuple)
208 num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size();
209 else
210 num_args = arguments.size();
211 }
212
213 size_t sizeOfData() const override
214 {
215 return sizeof(AggregateFunctionUniqUpToData<UInt64>) + sizeof(UInt64) * threshold;
216 }
217
218 String getName() const override { return "uniqUpTo"; }
219
220 DataTypePtr getReturnType() const override
221 {
222 return std::make_shared<DataTypeUInt64>();
223 }
224
225 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
226 {
227 this->data(place).insert(UInt64(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)), threshold);
228 }
229
230 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
231 {
232 this->data(place).merge(this->data(rhs), threshold);
233 }
234
235 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
236 {
237 this->data(place).write(buf, threshold);
238 }
239
240 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
241 {
242 this->data(place).read(buf, threshold);
243 }
244
245 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
246 {
247 assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
248 }
249};
250
251
252}
253