1 | #pragma once |
2 | |
3 | #include <math.h> |
4 | |
5 | #include <common/Types.h> |
6 | |
7 | #include <IO/WriteBuffer.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <IO/ReadBuffer.h> |
10 | #include <IO/ReadHelpers.h> |
11 | #include <IO/VarInt.h> |
12 | |
13 | #include <Common/HashTable/HashTableAllocator.h> |
14 | #include <Common/HashTable/Hash.h> |
15 | |
16 | |
17 | /** Approximate calculation of anything, as usual, is constructed according to the following scheme: |
18 | * - some data structure is used to calculate the value of X; |
19 | * - Not all values are added to the data structure, but only selected ones (according to some selectivity criteria); |
20 | * - after processing all elements, the data structure is in some state S; |
21 | * - as an approximate value of X, the value calculated according to the maximum likelihood principle is returned: |
22 | * at what real value X, the probability of finding the data structure in the obtained state S is maximal. |
23 | */ |
24 | |
25 | /** In particular, what is described below can be found by the name of the BJKST algorithm. |
26 | */ |
27 | |
28 | /** Very simple hash-set for approximate number of unique values. |
29 | * Works like this: |
30 | * - you can insert UInt64; |
31 | * - before insertion, first the hash function UInt64 -> UInt32 is calculated; |
32 | * - the original value is not saved (lost); |
33 | * - further all operations are made with these hashes; |
34 | * - hash table is constructed according to the scheme: |
35 | * - open addressing (one buffer, position in buffer is calculated by taking remainder of division by its size); |
36 | * - linear probing (if the cell already has a value, then the cell following it is taken, etc.); |
37 | * - the missing value is zero-encoded; to remember presence of zero in set, separate variable of type bool is used; |
38 | * - buffer growth by 2 times when filling more than 50%; |
39 | * - if the set has more UNIQUES_HASH_MAX_SIZE elements, then all the elements are removed from the set, |
40 | * not divisible by 2, and then all elements that do not divide by 2 are not inserted into the set; |
41 | * - if the situation repeats, then only elements dividing by 4, etc., are taken. |
42 | * - the size() method returns an approximate number of elements that have been inserted into the set; |
43 | * - there are methods for quick reading and writing in binary and text form. |
44 | */ |
45 | |
46 | /// The maximum degree of buffer size before the values are discarded |
47 | #define UNIQUES_HASH_MAX_SIZE_DEGREE 17 |
48 | |
49 | /// The maximum number of elements before the values are discarded |
50 | #define UNIQUES_HASH_MAX_SIZE (1ULL << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1)) |
51 | |
52 | /** The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table. |
53 | * (high-order bits are taken because the younger bits will be constant after dropping some of the values) |
54 | */ |
55 | #define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE) |
56 | |
57 | /// Initial buffer size degree |
58 | #define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4 |
59 | |
60 | |
61 | /** This hash function is not the most optimal, but UniquesHashSet states counted with it, |
62 | * stored in many places on disks (in the Yandex.Metrika), so it continues to be used. |
63 | */ |
64 | struct UniquesHashSetDefaultHash |
65 | { |
66 | size_t operator() (UInt64 x) const |
67 | { |
68 | return intHash32<0>(x); |
69 | } |
70 | }; |
71 | |
72 | |
73 | template <typename Hash = UniquesHashSetDefaultHash> |
74 | class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)> |
75 | { |
76 | private: |
77 | using Value = UInt64; |
78 | using HashValue = UInt32; |
79 | using Allocator = HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>; |
80 | |
81 | UInt32 m_size; /// Number of elements |
82 | UInt8 size_degree; /// The size of the table as a power of 2 |
83 | UInt8 skip_degree; /// Skip elements not divisible by 2 ^ skip_degree |
84 | bool has_zero; /// The hash table contains an element with a hash value of 0. |
85 | |
86 | HashValue * buf; |
87 | |
88 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
89 | /// For profiling. |
90 | mutable size_t collisions; |
91 | #endif |
92 | |
93 | void alloc(UInt8 new_size_degree) |
94 | { |
95 | buf = reinterpret_cast<HashValue *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0]))); |
96 | size_degree = new_size_degree; |
97 | } |
98 | |
99 | void free() |
100 | { |
101 | if (buf) |
102 | { |
103 | Allocator::free(buf, buf_size() * sizeof(buf[0])); |
104 | buf = nullptr; |
105 | } |
106 | } |
107 | |
108 | inline size_t buf_size() const { return 1ULL << size_degree; } |
109 | inline size_t max_fill() const { return 1ULL << (size_degree - 1); } |
110 | inline size_t mask() const { return buf_size() - 1; } |
111 | inline size_t place(HashValue x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); } |
112 | |
113 | /// The value is divided by 2 ^ skip_degree |
114 | inline bool good(HashValue hash) const |
115 | { |
116 | return hash == ((hash >> skip_degree) << skip_degree); |
117 | } |
118 | |
119 | HashValue hash(Value key) const |
120 | { |
121 | return Hash()(key); |
122 | } |
123 | |
124 | /// Delete all values whose hashes do not divide by 2 ^ skip_degree |
125 | void rehash() |
126 | { |
127 | for (size_t i = 0; i < buf_size(); ++i) |
128 | { |
129 | if (buf[i]) |
130 | { |
131 | if (!good(buf[i])) |
132 | { |
133 | buf[i] = 0; |
134 | --m_size; |
135 | } |
136 | /** After removing the elements, there may have been room for items, |
137 | * which were placed further than necessary, due to a collision. |
138 | * You need to move them. |
139 | */ |
140 | else if (i != place(buf[i])) |
141 | { |
142 | HashValue x = buf[i]; |
143 | buf[i] = 0; |
144 | reinsertImpl(x); |
145 | } |
146 | } |
147 | } |
148 | |
149 | /** We must process first collision resolution chain once again. |
150 | * Look at the comment in "resize" function. |
151 | */ |
152 | for (size_t i = 0; i < buf_size() && buf[i]; ++i) |
153 | { |
154 | if (i != place(buf[i])) |
155 | { |
156 | HashValue x = buf[i]; |
157 | buf[i] = 0; |
158 | reinsertImpl(x); |
159 | } |
160 | } |
161 | } |
162 | |
163 | /// Increase the size of the buffer 2 times or up to new_size_degree, if it is non-zero. |
164 | void resize(size_t new_size_degree = 0) |
165 | { |
166 | size_t old_size = buf_size(); |
167 | |
168 | if (!new_size_degree) |
169 | new_size_degree = size_degree + 1; |
170 | |
171 | /// Expand the space. |
172 | buf = reinterpret_cast<HashValue *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0]))); |
173 | size_degree = new_size_degree; |
174 | |
175 | /** Now some items may need to be moved to a new location. |
176 | * The element can stay in place, or move to a new location "on the right", |
177 | * or move to the left of the collision resolution chain, because the elements to the left of it have been moved to the new "right" location. |
178 | * There is also a special case |
179 | * if the element was to be at the end of the old buffer, [ x] |
180 | * but is at the beginning because of the collision resolution chain, [o x] |
181 | * then after resizing, it will first be out of place again, [ xo ] |
182 | * and in order to transfer it to where you need it, |
183 | * will have to be after transferring all elements from the old half [ o x ] |
184 | * process another tail from the collision resolution chain immediately after it [ o x ] |
185 | * This is why || buf[i] below. |
186 | */ |
187 | for (size_t i = 0; i < old_size || buf[i]; ++i) |
188 | { |
189 | HashValue x = buf[i]; |
190 | if (!x) |
191 | continue; |
192 | |
193 | size_t place_value = place(x); |
194 | |
195 | /// The element is in its place. |
196 | if (place_value == i) |
197 | continue; |
198 | |
199 | while (buf[place_value] && buf[place_value] != x) |
200 | { |
201 | ++place_value; |
202 | place_value &= mask(); |
203 | |
204 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
205 | ++collisions; |
206 | #endif |
207 | } |
208 | |
209 | /// The element remained in its place. |
210 | if (buf[place_value] == x) |
211 | continue; |
212 | |
213 | buf[place_value] = x; |
214 | buf[i] = 0; |
215 | } |
216 | } |
217 | |
218 | /// Insert a value. |
219 | void insertImpl(HashValue x) |
220 | { |
221 | if (x == 0) |
222 | { |
223 | m_size += !has_zero; |
224 | has_zero = true; |
225 | return; |
226 | } |
227 | |
228 | size_t place_value = place(x); |
229 | while (buf[place_value] && buf[place_value] != x) |
230 | { |
231 | ++place_value; |
232 | place_value &= mask(); |
233 | |
234 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
235 | ++collisions; |
236 | #endif |
237 | } |
238 | |
239 | if (buf[place_value] == x) |
240 | return; |
241 | |
242 | buf[place_value] = x; |
243 | ++m_size; |
244 | } |
245 | |
246 | /** Insert a value into the new buffer that was in the old buffer. |
247 | * Used when increasing the size of the buffer, as well as when reading from a file. |
248 | */ |
249 | void reinsertImpl(HashValue x) |
250 | { |
251 | size_t place_value = place(x); |
252 | while (buf[place_value]) |
253 | { |
254 | ++place_value; |
255 | place_value &= mask(); |
256 | |
257 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
258 | ++collisions; |
259 | #endif |
260 | } |
261 | |
262 | buf[place_value] = x; |
263 | } |
264 | |
265 | /** If the hash table is full enough, then do resize. |
266 | * If there are too many items, then throw half the pieces until they are small enough. |
267 | */ |
268 | void shrinkIfNeed() |
269 | { |
270 | if (unlikely(m_size > max_fill())) |
271 | { |
272 | if (m_size > UNIQUES_HASH_MAX_SIZE) |
273 | { |
274 | while (m_size > UNIQUES_HASH_MAX_SIZE) |
275 | { |
276 | ++skip_degree; |
277 | rehash(); |
278 | } |
279 | } |
280 | else |
281 | resize(); |
282 | } |
283 | } |
284 | |
285 | |
286 | public: |
287 | using value_type = Value; |
288 | |
289 | UniquesHashSet() : |
290 | m_size(0), |
291 | skip_degree(0), |
292 | has_zero(false) |
293 | { |
294 | alloc(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE); |
295 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
296 | collisions = 0; |
297 | #endif |
298 | } |
299 | |
300 | UniquesHashSet(const UniquesHashSet & rhs) |
301 | : m_size(rhs.m_size), skip_degree(rhs.skip_degree), has_zero(rhs.has_zero) |
302 | { |
303 | alloc(rhs.size_degree); |
304 | memcpy(buf, rhs.buf, buf_size() * sizeof(buf[0])); |
305 | } |
306 | |
307 | UniquesHashSet & operator= (const UniquesHashSet & rhs) |
308 | { |
309 | if (size_degree != rhs.size_degree) |
310 | { |
311 | free(); |
312 | alloc(rhs.size_degree); |
313 | } |
314 | |
315 | m_size = rhs.m_size; |
316 | skip_degree = rhs.skip_degree; |
317 | has_zero = rhs.has_zero; |
318 | |
319 | memcpy(buf, rhs.buf, buf_size() * sizeof(buf[0])); |
320 | |
321 | return *this; |
322 | } |
323 | |
324 | ~UniquesHashSet() |
325 | { |
326 | free(); |
327 | } |
328 | |
329 | void insert(Value x) |
330 | { |
331 | HashValue hash_value = hash(x); |
332 | if (!good(hash_value)) |
333 | return; |
334 | |
335 | insertImpl(hash_value); |
336 | shrinkIfNeed(); |
337 | } |
338 | |
339 | size_t size() const |
340 | { |
341 | if (0 == skip_degree) |
342 | return m_size; |
343 | |
344 | size_t res = m_size * (1ULL << skip_degree); |
345 | |
346 | /** Pseudo-random remainder - in order to be not visible, |
347 | * that the number is divided by the power of two. |
348 | */ |
349 | res += (intHashCRC32(m_size) & ((1ULL << skip_degree) - 1)); |
350 | |
351 | /** Correction of a systematic error due to collisions during hashing in UInt32. |
352 | * `fixed_res(res)` formula |
353 | * - with how many different elements of fixed_res, |
354 | * when randomly scattered across 2^32 buckets, |
355 | * filled buckets with average of res is obtained. |
356 | */ |
357 | size_t p32 = 1ULL << 32; |
358 | size_t fixed_res = round(p32 * (log(p32) - log(p32 - res))); |
359 | return fixed_res; |
360 | } |
361 | |
362 | void merge(const UniquesHashSet & rhs) |
363 | { |
364 | if (rhs.skip_degree > skip_degree) |
365 | { |
366 | skip_degree = rhs.skip_degree; |
367 | rehash(); |
368 | } |
369 | |
370 | if (!has_zero && rhs.has_zero) |
371 | { |
372 | has_zero = true; |
373 | ++m_size; |
374 | shrinkIfNeed(); |
375 | } |
376 | |
377 | for (size_t i = 0; i < rhs.buf_size(); ++i) |
378 | { |
379 | if (rhs.buf[i] && good(rhs.buf[i])) |
380 | { |
381 | insertImpl(rhs.buf[i]); |
382 | shrinkIfNeed(); |
383 | } |
384 | } |
385 | } |
386 | |
387 | void write(DB::WriteBuffer & wb) const |
388 | { |
389 | if (m_size > UNIQUES_HASH_MAX_SIZE) |
390 | throw Poco::Exception("Cannot write UniquesHashSet: too large size_degree." ); |
391 | |
392 | DB::writeIntBinary(skip_degree, wb); |
393 | DB::writeVarUInt(m_size, wb); |
394 | |
395 | if (has_zero) |
396 | { |
397 | HashValue x = 0; |
398 | DB::writeIntBinary(x, wb); |
399 | } |
400 | |
401 | for (size_t i = 0; i < buf_size(); ++i) |
402 | if (buf[i]) |
403 | DB::writeIntBinary(buf[i], wb); |
404 | } |
405 | |
406 | void read(DB::ReadBuffer & rb) |
407 | { |
408 | has_zero = false; |
409 | |
410 | DB::readIntBinary(skip_degree, rb); |
411 | DB::readVarUInt(m_size, rb); |
412 | |
413 | if (m_size > UNIQUES_HASH_MAX_SIZE) |
414 | throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree." ); |
415 | |
416 | free(); |
417 | |
418 | UInt8 new_size_degree = m_size <= 1 |
419 | ? UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE |
420 | : std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(m_size - 1)) + 2); |
421 | |
422 | alloc(new_size_degree); |
423 | |
424 | for (size_t i = 0; i < m_size; ++i) |
425 | { |
426 | HashValue x = 0; |
427 | DB::readIntBinary(x, rb); |
428 | if (x == 0) |
429 | has_zero = true; |
430 | else |
431 | reinsertImpl(x); |
432 | } |
433 | } |
434 | |
435 | void readAndMerge(DB::ReadBuffer & rb) |
436 | { |
437 | UInt8 rhs_skip_degree = 0; |
438 | DB::readIntBinary(rhs_skip_degree, rb); |
439 | |
440 | if (rhs_skip_degree > skip_degree) |
441 | { |
442 | skip_degree = rhs_skip_degree; |
443 | rehash(); |
444 | } |
445 | |
446 | size_t rhs_size = 0; |
447 | DB::readVarUInt(rhs_size, rb); |
448 | |
449 | if (rhs_size > UNIQUES_HASH_MAX_SIZE) |
450 | throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree." ); |
451 | |
452 | if ((1ULL << size_degree) < rhs_size) |
453 | { |
454 | UInt8 new_size_degree = std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(rhs_size - 1)) + 2); |
455 | resize(new_size_degree); |
456 | } |
457 | |
458 | for (size_t i = 0; i < rhs_size; ++i) |
459 | { |
460 | HashValue x = 0; |
461 | DB::readIntBinary(x, rb); |
462 | insertHash(x); |
463 | } |
464 | } |
465 | |
466 | static void skip(DB::ReadBuffer & rb) |
467 | { |
468 | size_t size = 0; |
469 | |
470 | rb.ignore(); |
471 | DB::readVarUInt(size, rb); |
472 | |
473 | if (size > UNIQUES_HASH_MAX_SIZE) |
474 | throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree." ); |
475 | |
476 | rb.ignore(sizeof(HashValue) * size); |
477 | } |
478 | |
479 | void writeText(DB::WriteBuffer & wb) const |
480 | { |
481 | if (m_size > UNIQUES_HASH_MAX_SIZE) |
482 | throw Poco::Exception("Cannot write UniquesHashSet: too large size_degree." ); |
483 | |
484 | DB::writeIntText(skip_degree, wb); |
485 | wb.write("," , 1); |
486 | DB::writeIntText(m_size, wb); |
487 | |
488 | if (has_zero) |
489 | wb.write(",0" , 2); |
490 | |
491 | for (size_t i = 0; i < buf_size(); ++i) |
492 | { |
493 | if (buf[i]) |
494 | { |
495 | wb.write("," , 1); |
496 | DB::writeIntText(buf[i], wb); |
497 | } |
498 | } |
499 | } |
500 | |
501 | void readText(DB::ReadBuffer & rb) |
502 | { |
503 | has_zero = false; |
504 | |
505 | DB::readIntText(skip_degree, rb); |
506 | DB::assertChar(',', rb); |
507 | DB::readIntText(m_size, rb); |
508 | |
509 | if (m_size > UNIQUES_HASH_MAX_SIZE) |
510 | throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree." ); |
511 | |
512 | free(); |
513 | |
514 | UInt8 new_size_degree = m_size <= 1 |
515 | ? UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE |
516 | : std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(m_size - 1)) + 2); |
517 | |
518 | alloc(new_size_degree); |
519 | |
520 | for (size_t i = 0; i < m_size; ++i) |
521 | { |
522 | HashValue x = 0; |
523 | DB::assertChar(',', rb); |
524 | DB::readIntText(x, rb); |
525 | if (x == 0) |
526 | has_zero = true; |
527 | else |
528 | reinsertImpl(x); |
529 | } |
530 | } |
531 | |
532 | void insertHash(HashValue hash_value) |
533 | { |
534 | if (!good(hash_value)) |
535 | return; |
536 | |
537 | insertImpl(hash_value); |
538 | shrinkIfNeed(); |
539 | } |
540 | |
541 | #ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS |
542 | size_t getCollisions() const |
543 | { |
544 | return collisions; |
545 | } |
546 | #endif |
547 | }; |
548 | |
549 | |
550 | #undef UNIQUES_HASH_MAX_SIZE_DEGREE |
551 | #undef UNIQUES_HASH_MAX_SIZE |
552 | #undef UNIQUES_HASH_BITS_FOR_SKIP |
553 | #undef UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE |
554 | |