1#pragma once
2
3#include <Common/HashTable/HashMap.h>
4#include <Common/NaNUtils.h>
5
6
7namespace DB
8{
9
10namespace ErrorCodes
11{
12 extern const int NOT_IMPLEMENTED;
13}
14
15/** Calculates quantile by counting number of occurrences for each value in a hash map.
16 *
17 * It uses O(distinct(N)) memory. Can be naturally applied for values with weight.
18 * In case of many identical values, it can be more efficient than QuantileExact even when weight is not used.
19 */
20template <typename Value>
21struct QuantileExactWeighted
22{
23 struct Int128Hash
24 {
25 size_t operator()(Int128 x) const
26 {
27 return CityHash_v1_0_2::Hash128to64({x >> 64, x & 0xffffffffffffffffll});
28 }
29 };
30
31 using Weight = UInt64;
32 using UnderlyingType = typename NativeType<Value>::Type;
33 using Hasher = std::conditional_t<std::is_same_v<Value, Decimal128>, Int128Hash, HashCRC32<UnderlyingType>>;
34
35 /// When creating, the hash table must be small.
36 using Map = HashMap<
37 UnderlyingType, Weight,
38 Hasher,
39 HashTableGrower<4>,
40 HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)>
41 >;
42
43 Map map;
44
45 void add(const Value & x)
46 {
47 /// We must skip NaNs as they are not compatible with comparison sorting.
48 if (!isNaN(x))
49 ++map[x];
50 }
51
52 void add(const Value & x, Weight weight)
53 {
54 if (!isNaN(x))
55 map[x] += weight;
56 }
57
58 void merge(const QuantileExactWeighted & rhs)
59 {
60 for (const auto & pair : rhs.map)
61 map[pair.getKey()] += pair.getMapped();
62 }
63
64 void serialize(WriteBuffer & buf) const
65 {
66 map.write(buf);
67 }
68
69 void deserialize(ReadBuffer & buf)
70 {
71 typename Map::Reader reader(buf);
72 while (reader.next())
73 {
74 const auto & pair = reader.get();
75 map[pair.first] = pair.second;
76 }
77 }
78
79 /// Get the value of the `level` quantile. The level must be between 0 and 1.
80 Value get(Float64 level) const
81 {
82 size_t size = map.size();
83
84 if (0 == size)
85 return std::numeric_limits<Value>::quiet_NaN();
86
87 /// Copy the data to a temporary array to get the element you need in order.
88 using Pair = typename Map::value_type;
89 std::unique_ptr<Pair[]> array_holder(new Pair[size]);
90 Pair * array = array_holder.get();
91
92 size_t i = 0;
93 UInt64 sum_weight = 0;
94 for (const auto & pair : map)
95 {
96 sum_weight += pair.getMapped();
97 array[i] = pair.getValue();
98 ++i;
99 }
100
101 std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
102
103 UInt64 threshold = std::ceil(sum_weight * level);
104 UInt64 accumulated = 0;
105
106 const Pair * it = array;
107 const Pair * end = array + size;
108 while (it < end)
109 {
110 accumulated += it->second;
111
112 if (accumulated >= threshold)
113 break;
114
115 ++it;
116 }
117
118 if (it == end)
119 --it;
120
121 return it->first;
122 }
123
124 /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
125 /// indices - an array of index levels such that the corresponding elements will go in ascending order.
126 void getMany(const Float64 * levels, const size_t * indices, size_t num_levels, Value * result) const
127 {
128 size_t size = map.size();
129
130 if (0 == size)
131 {
132 for (size_t i = 0; i < num_levels; ++i)
133 result[i] = Value();
134 return;
135 }
136
137 /// Copy the data to a temporary array to get the element you need in order.
138 using Pair = typename Map::value_type;
139 std::unique_ptr<Pair[]> array_holder(new Pair[size]);
140 Pair * array = array_holder.get();
141
142 size_t i = 0;
143 UInt64 sum_weight = 0;
144 for (const auto & pair : map)
145 {
146 sum_weight += pair.getMapped();
147 array[i] = pair.getValue();
148 ++i;
149 }
150
151 std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
152
153 UInt64 accumulated = 0;
154
155 const Pair * it = array;
156 const Pair * end = array + size;
157
158 size_t level_index = 0;
159 UInt64 threshold = std::ceil(sum_weight * levels[indices[level_index]]);
160
161 while (it < end)
162 {
163 accumulated += it->second;
164
165 while (accumulated >= threshold)
166 {
167 result[indices[level_index]] = it->first;
168 ++level_index;
169
170 if (level_index == num_levels)
171 return;
172
173 threshold = std::ceil(sum_weight * levels[indices[level_index]]);
174 }
175
176 ++it;
177 }
178
179 while (level_index < num_levels)
180 {
181 result[indices[level_index]] = array[size - 1].first;
182 ++level_index;
183 }
184 }
185
186 /// The same, but in the case of an empty state, NaN is returned.
187 Float64 getFloat(Float64) const
188 {
189 throw Exception("Method getFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED);
190 }
191
192 void getManyFloat(const Float64 *, const size_t *, size_t, Float64 *) const
193 {
194 throw Exception("Method getManyFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED);
195 }
196};
197
198}
199