1#pragma once
2
3#include <cmath>
4#include <Common/RadixSort.h>
5#include <Common/PODArray.h>
6#include <IO/WriteBuffer.h>
7#include <IO/ReadBuffer.h>
8#include <IO/VarInt.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int TOO_LARGE_ARRAY_SIZE;
17}
18
19
20/** The algorithm was implemented by Alexei Borzenkov https://github.com/snaury
21 * He owns the authorship of the code and half the comments in this namespace,
22 * except for merging, serialization, and sorting, as well as selecting types and other changes.
23 * We thank Alexei Borzenkov for writing the original code.
24 */
25
26/** Implementation of t-digest algorithm (https://github.com/tdunning/t-digest).
27 * This option is very similar to MergingDigest on java, however the decision about
28 * the union is accepted based on the original condition from the article
29 * (via a size constraint, using the approximation of the quantile of each
30 * centroid, not the distance on the curve of the position of their boundaries). MergingDigest
31 * on java gives significantly fewer centroids than this variant, that
32 * negatively affects accuracy with the same compression factor, but gives
33 * size guarantees. The author himself on the proposal for this variant said that
34 * the size of the digest grows like O(log(n)), while the version on java
35 * does not depend on the expected number of points. Also an variant on java
36 * uses asin, which slows down the algorithm a bit.
37 */
38template <typename T>
39class QuantileTDigest
40{
41 using Value = Float32;
42 using Count = Float32;
43
44 /** The centroid stores the weight of points around their mean value
45 */
46 struct Centroid
47 {
48 Value mean;
49 Count count;
50
51 Centroid() = default;
52
53 explicit Centroid(Value mean_, Count count_)
54 : mean(mean_)
55 , count(count_)
56 {}
57
58 Centroid & operator+=(const Centroid & other)
59 {
60 count += other.count;
61 mean += other.count * (other.mean - mean) / count;
62 return *this;
63 }
64
65 bool operator<(const Centroid & other) const
66 {
67 return mean < other.mean;
68 }
69 };
70
71
72 /** :param epsilon: value \delta from the article - error in the range
73 * quantile 0.5 (default is 0.01, i.e. 1%)
74 * :param max_unmerged: when accumulating count of new points beyond this
75 * value centroid compression is triggered
76 * (default is 2048, the higher the value - the
77 * more memory is required, but amortization of execution time increases)
78 */
79 struct Params
80 {
81 Value epsilon = 0.01;
82 size_t max_unmerged = 2048;
83 };
84
85 Params params;
86
87 /// The memory will be allocated to several elements at once, so that the state occupies 64 bytes.
88 static constexpr size_t bytes_in_arena = 128 - sizeof(PODArray<Centroid>) - sizeof(Count) - sizeof(UInt32);
89 using Summary = PODArrayWithStackMemory<Centroid, bytes_in_arena>;
90
91 Summary summary;
92 Count count = 0;
93 UInt32 unmerged = 0;
94
95 /** Linear interpolation at the point x on the line (x1, y1)..(x2, y2)
96 */
97 static Value interpolate(Value x, Value x1, Value y1, Value x2, Value y2)
98 {
99 double k = (x - x1) / (x2 - x1);
100 return y1 + k * (y2 - y1);
101 }
102
103 struct RadixSortTraits
104 {
105 using Element = Centroid;
106 using Key = Value;
107 using CountType = UInt32;
108 using KeyBits = UInt32;
109
110 static constexpr size_t PART_SIZE_BITS = 8;
111
112 using Transform = RadixSortFloatTransform<KeyBits>;
113 using Allocator = RadixSortMallocAllocator;
114
115 /// The function to get the key from an array element.
116 static Key & extractKey(Element & elem) { return elem.mean; }
117 };
118
119 /** Adds a centroid `c` to the digest
120 */
121 void addCentroid(const Centroid & c)
122 {
123 summary.push_back(c);
124 count += c.count;
125 ++unmerged;
126 if (unmerged >= params.max_unmerged)
127 compress();
128 }
129
130 /** Performs compression of accumulated centroids
131 * When merging, the invariant is retained to the maximum size of each
132 * centroid that does not exceed `4 q (1 - q) \ delta N`.
133 */
134 void compress()
135 {
136 if (unmerged > 0)
137 {
138 RadixSort<RadixSortTraits>::executeLSD(summary.data(), summary.size());
139
140 if (summary.size() > 3)
141 {
142 /// A pair of consecutive bars of the histogram.
143 auto l = summary.begin();
144 auto r = std::next(l);
145
146 Count sum = 0;
147 while (r != summary.end())
148 {
149 // we use quantile which gives us the smallest error
150
151 /// The ratio of the part of the histogram to l, including the half l to the entire histogram. That is, what level quantile in position l.
152 Value ql = (sum + l->count * 0.5) / count;
153 Value err = ql * (1 - ql);
154
155 /// The ratio of the portion of the histogram to l, including l and half r to the entire histogram. That is, what level is the quantile in position r.
156 Value qr = (sum + l->count + r->count * 0.5) / count;
157 Value err2 = qr * (1 - qr);
158
159 if (err > err2)
160 err = err2;
161
162 Value k = 4 * count * err * params.epsilon;
163
164 /** The ratio of the weight of the glued column pair to all values is not greater,
165 * than epsilon multiply by a certain quadratic coefficient, which in the median is 1 (4 * 1/2 * 1/2),
166 * and at the edges decreases and is approximately equal to the distance to the edge * 4.
167 */
168
169 if (l->count + r->count <= k)
170 {
171 // it is possible to merge left and right
172 /// The left column "eats" the right.
173 *l += *r;
174 }
175 else
176 {
177 // not enough capacity, check the next pair
178 sum += l->count;
179 ++l;
180
181 /// We skip all the values "eaten" earlier.
182 if (l != r)
183 *l = *r;
184 }
185 ++r;
186 }
187
188 /// At the end of the loop, all values to the right of l were "eaten".
189 summary.resize(l - summary.begin() + 1);
190 }
191
192 unmerged = 0;
193 }
194 }
195
196public:
197 /** Adds to the digest a change in `x` with a weight of `cnt` (default 1)
198 */
199 void add(T x, UInt64 cnt = 1)
200 {
201 addCentroid(Centroid(Value(x), Count(cnt)));
202 }
203
204 void merge(const QuantileTDigest & other)
205 {
206 for (const auto & c : other.summary)
207 addCentroid(c);
208 }
209
210 void serialize(WriteBuffer & buf)
211 {
212 compress();
213 writeVarUInt(summary.size(), buf);
214 buf.write(reinterpret_cast<const char *>(summary.data()), summary.size() * sizeof(summary[0]));
215 }
216
217 void deserialize(ReadBuffer & buf)
218 {
219 size_t size = 0;
220 readVarUInt(size, buf);
221
222 if (size > params.max_unmerged)
223 throw Exception("Too large t-digest summary size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
224
225 summary.resize(size);
226 buf.read(reinterpret_cast<char *>(summary.data()), size * sizeof(summary[0]));
227
228 count = 0;
229 for (const auto & c : summary)
230 count += c.count;
231 }
232
233 /** Calculates the quantile q [0, 1] based on the digest.
234 * For an empty digest returns NaN.
235 */
236 template <typename ResultType>
237 ResultType getImpl(Float64 level)
238 {
239 if (summary.empty())
240 return std::is_floating_point_v<ResultType> ? NAN : 0;
241
242 compress();
243
244 if (summary.size() == 1)
245 return summary.front().mean;
246
247 Float64 x = level * count;
248 Float64 prev_x = 0;
249 Count sum = 0;
250 Value prev_mean = summary.front().mean;
251
252 for (const auto & c : summary)
253 {
254 Float64 current_x = sum + c.count * 0.5;
255
256 if (current_x >= x)
257 return interpolate(x, prev_x, prev_mean, current_x, c.mean);
258
259 sum += c.count;
260 prev_mean = c.mean;
261 prev_x = current_x;
262 }
263
264 return summary.back().mean;
265 }
266
267 /** Get multiple quantiles (`size` parts).
268 * levels - an array of levels of the desired quantiles. They are in a random order.
269 * levels_permutation - array-permutation levels. The i-th position will be the index of the i-th ascending level in the `levels` array.
270 * result - the array where the results are added, in order of `levels`,
271 */
272 template <typename ResultType>
273 void getManyImpl(const Float64 * levels, const size_t * levels_permutation, size_t size, ResultType * result)
274 {
275 if (summary.empty())
276 {
277 for (size_t result_num = 0; result_num < size; ++result_num)
278 result[result_num] = std::is_floating_point_v<ResultType> ? NAN : 0;
279 return;
280 }
281
282 compress();
283
284 if (summary.size() == 1)
285 {
286 for (size_t result_num = 0; result_num < size; ++result_num)
287 result[result_num] = summary.front().mean;
288 return;
289 }
290
291 Float64 x = levels[levels_permutation[0]] * count;
292 Float64 prev_x = 0;
293 Count sum = 0;
294 Value prev_mean = summary.front().mean;
295
296 size_t result_num = 0;
297 for (const auto & c : summary)
298 {
299 Float64 current_x = sum + c.count * 0.5;
300
301 while (current_x >= x)
302 {
303 result[levels_permutation[result_num]] = interpolate(x, prev_x, prev_mean, current_x, c.mean);
304
305 ++result_num;
306 if (result_num >= size)
307 return;
308
309 x = levels[levels_permutation[result_num]] * count;
310 }
311
312 sum += c.count;
313 prev_mean = c.mean;
314 prev_x = current_x;
315 }
316
317 auto rest_of_results = summary.back().mean;
318 for (; result_num < size; ++result_num)
319 result[levels_permutation[result_num]] = rest_of_results;
320 }
321
322 T get(Float64 level)
323 {
324 return getImpl<T>(level);
325 }
326
327 Float32 getFloat(Float64 level)
328 {
329 return getImpl<Float32>(level);
330 }
331
332 void getMany(const Float64 * levels, const size_t * indices, size_t size, T * result)
333 {
334 getManyImpl(levels, indices, size, result);
335 }
336
337 void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float32 * result)
338 {
339 getManyImpl(levels, indices, size, result);
340 }
341};
342
343}
344