1 | #pragma once |
2 | |
3 | #include <Common/HashTable/HashMap.h> |
4 | #include <Common/NaNUtils.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | namespace ErrorCodes |
11 | { |
12 | extern const int NOT_IMPLEMENTED; |
13 | } |
14 | |
15 | /** Calculates quantile by counting number of occurrences for each value in a hash map. |
16 | * |
17 | * It uses O(distinct(N)) memory. Can be naturally applied for values with weight. |
18 | * In case of many identical values, it can be more efficient than QuantileExact even when weight is not used. |
19 | */ |
20 | template <typename Value> |
21 | struct QuantileExactWeighted |
22 | { |
23 | struct Int128Hash |
24 | { |
25 | size_t operator()(Int128 x) const |
26 | { |
27 | return CityHash_v1_0_2::Hash128to64({x >> 64, x & 0xffffffffffffffffll}); |
28 | } |
29 | }; |
30 | |
31 | using Weight = UInt64; |
32 | using UnderlyingType = typename NativeType<Value>::Type; |
33 | using Hasher = std::conditional_t<std::is_same_v<Value, Decimal128>, Int128Hash, HashCRC32<UnderlyingType>>; |
34 | |
35 | /// When creating, the hash table must be small. |
36 | using Map = HashMap< |
37 | UnderlyingType, Weight, |
38 | Hasher, |
39 | HashTableGrower<4>, |
40 | HashTableAllocatorWithStackMemory<sizeof(std::pair<Value, Weight>) * (1 << 3)> |
41 | >; |
42 | |
43 | Map map; |
44 | |
45 | void add(const Value & x) |
46 | { |
47 | /// We must skip NaNs as they are not compatible with comparison sorting. |
48 | if (!isNaN(x)) |
49 | ++map[x]; |
50 | } |
51 | |
52 | void add(const Value & x, Weight weight) |
53 | { |
54 | if (!isNaN(x)) |
55 | map[x] += weight; |
56 | } |
57 | |
58 | void merge(const QuantileExactWeighted & rhs) |
59 | { |
60 | for (const auto & pair : rhs.map) |
61 | map[pair.getKey()] += pair.getMapped(); |
62 | } |
63 | |
64 | void serialize(WriteBuffer & buf) const |
65 | { |
66 | map.write(buf); |
67 | } |
68 | |
69 | void deserialize(ReadBuffer & buf) |
70 | { |
71 | typename Map::Reader reader(buf); |
72 | while (reader.next()) |
73 | { |
74 | const auto & pair = reader.get(); |
75 | map[pair.first] = pair.second; |
76 | } |
77 | } |
78 | |
79 | /// Get the value of the `level` quantile. The level must be between 0 and 1. |
80 | Value get(Float64 level) const |
81 | { |
82 | size_t size = map.size(); |
83 | |
84 | if (0 == size) |
85 | return std::numeric_limits<Value>::quiet_NaN(); |
86 | |
87 | /// Copy the data to a temporary array to get the element you need in order. |
88 | using Pair = typename Map::value_type; |
89 | std::unique_ptr<Pair[]> array_holder(new Pair[size]); |
90 | Pair * array = array_holder.get(); |
91 | |
92 | size_t i = 0; |
93 | UInt64 sum_weight = 0; |
94 | for (const auto & pair : map) |
95 | { |
96 | sum_weight += pair.getMapped(); |
97 | array[i] = pair.getValue(); |
98 | ++i; |
99 | } |
100 | |
101 | std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; }); |
102 | |
103 | UInt64 threshold = std::ceil(sum_weight * level); |
104 | UInt64 accumulated = 0; |
105 | |
106 | const Pair * it = array; |
107 | const Pair * end = array + size; |
108 | while (it < end) |
109 | { |
110 | accumulated += it->second; |
111 | |
112 | if (accumulated >= threshold) |
113 | break; |
114 | |
115 | ++it; |
116 | } |
117 | |
118 | if (it == end) |
119 | --it; |
120 | |
121 | return it->first; |
122 | } |
123 | |
124 | /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address. |
125 | /// indices - an array of index levels such that the corresponding elements will go in ascending order. |
126 | void getMany(const Float64 * levels, const size_t * indices, size_t num_levels, Value * result) const |
127 | { |
128 | size_t size = map.size(); |
129 | |
130 | if (0 == size) |
131 | { |
132 | for (size_t i = 0; i < num_levels; ++i) |
133 | result[i] = Value(); |
134 | return; |
135 | } |
136 | |
137 | /// Copy the data to a temporary array to get the element you need in order. |
138 | using Pair = typename Map::value_type; |
139 | std::unique_ptr<Pair[]> array_holder(new Pair[size]); |
140 | Pair * array = array_holder.get(); |
141 | |
142 | size_t i = 0; |
143 | UInt64 sum_weight = 0; |
144 | for (const auto & pair : map) |
145 | { |
146 | sum_weight += pair.getMapped(); |
147 | array[i] = pair.getValue(); |
148 | ++i; |
149 | } |
150 | |
151 | std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; }); |
152 | |
153 | UInt64 accumulated = 0; |
154 | |
155 | const Pair * it = array; |
156 | const Pair * end = array + size; |
157 | |
158 | size_t level_index = 0; |
159 | UInt64 threshold = std::ceil(sum_weight * levels[indices[level_index]]); |
160 | |
161 | while (it < end) |
162 | { |
163 | accumulated += it->second; |
164 | |
165 | while (accumulated >= threshold) |
166 | { |
167 | result[indices[level_index]] = it->first; |
168 | ++level_index; |
169 | |
170 | if (level_index == num_levels) |
171 | return; |
172 | |
173 | threshold = std::ceil(sum_weight * levels[indices[level_index]]); |
174 | } |
175 | |
176 | ++it; |
177 | } |
178 | |
179 | while (level_index < num_levels) |
180 | { |
181 | result[indices[level_index]] = array[size - 1].first; |
182 | ++level_index; |
183 | } |
184 | } |
185 | |
186 | /// The same, but in the case of an empty state, NaN is returned. |
187 | Float64 getFloat(Float64) const |
188 | { |
189 | throw Exception("Method getFloat is not implemented for QuantileExact" , ErrorCodes::NOT_IMPLEMENTED); |
190 | } |
191 | |
192 | void getManyFloat(const Float64 *, const size_t *, size_t, Float64 *) const |
193 | { |
194 | throw Exception("Method getManyFloat is not implemented for QuantileExact" , ErrorCodes::NOT_IMPLEMENTED); |
195 | } |
196 | }; |
197 | |
198 | } |
199 | |