1#pragma once
2
3#include <math.h>
4
5#include <common/Types.h>
6
7#include <IO/WriteBuffer.h>
8#include <IO/WriteHelpers.h>
9#include <IO/ReadBuffer.h>
10#include <IO/ReadHelpers.h>
11#include <IO/VarInt.h>
12
13#include <Common/HashTable/HashTableAllocator.h>
14#include <Common/HashTable/Hash.h>
15
16
17/** Approximate calculation of anything, as usual, is constructed according to the following scheme:
18 * - some data structure is used to calculate the value of X;
19 * - Not all values are added to the data structure, but only selected ones (according to some selectivity criteria);
20 * - after processing all elements, the data structure is in some state S;
21 * - as an approximate value of X, the value calculated according to the maximum likelihood principle is returned:
22 * at what real value X, the probability of finding the data structure in the obtained state S is maximal.
23 */
24
25/** In particular, what is described below can be found by the name of the BJKST algorithm.
26 */
27
28/** Very simple hash-set for approximate number of unique values.
29 * Works like this:
30 * - you can insert UInt64;
31 * - before insertion, first the hash function UInt64 -> UInt32 is calculated;
32 * - the original value is not saved (lost);
33 * - further all operations are made with these hashes;
34 * - hash table is constructed according to the scheme:
35 * - open addressing (one buffer, position in buffer is calculated by taking remainder of division by its size);
36 * - linear probing (if the cell already has a value, then the cell following it is taken, etc.);
37 * - the missing value is zero-encoded; to remember presence of zero in set, separate variable of type bool is used;
38 * - buffer growth by 2 times when filling more than 50%;
39 * - if the set has more UNIQUES_HASH_MAX_SIZE elements, then all the elements are removed from the set,
40 * not divisible by 2, and then all elements that do not divide by 2 are not inserted into the set;
41 * - if the situation repeats, then only elements dividing by 4, etc., are taken.
42 * - the size() method returns an approximate number of elements that have been inserted into the set;
43 * - there are methods for quick reading and writing in binary and text form.
44 */
45
46/// The maximum degree of buffer size before the values are discarded
47#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
48
49/// The maximum number of elements before the values are discarded
50#define UNIQUES_HASH_MAX_SIZE (1ULL << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
51
52/** The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table.
53 * (high-order bits are taken because the younger bits will be constant after dropping some of the values)
54 */
55#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
56
57/// Initial buffer size degree
58#define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4
59
60
61/** This hash function is not the most optimal, but UniquesHashSet states counted with it,
62 * stored in many places on disks (in the Yandex.Metrika), so it continues to be used.
63 */
64struct UniquesHashSetDefaultHash
65{
66 size_t operator() (UInt64 x) const
67 {
68 return intHash32<0>(x);
69 }
70};
71
72
73template <typename Hash = UniquesHashSetDefaultHash>
74class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
75{
76private:
77 using Value = UInt64;
78 using HashValue = UInt32;
79 using Allocator = HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
80
81 UInt32 m_size; /// Number of elements
82 UInt8 size_degree; /// The size of the table as a power of 2
83 UInt8 skip_degree; /// Skip elements not divisible by 2 ^ skip_degree
84 bool has_zero; /// The hash table contains an element with a hash value of 0.
85
86 HashValue * buf;
87
88#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
89 /// For profiling.
90 mutable size_t collisions;
91#endif
92
93 void alloc(UInt8 new_size_degree)
94 {
95 buf = reinterpret_cast<HashValue *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0])));
96 size_degree = new_size_degree;
97 }
98
99 void free()
100 {
101 if (buf)
102 {
103 Allocator::free(buf, buf_size() * sizeof(buf[0]));
104 buf = nullptr;
105 }
106 }
107
108 inline size_t buf_size() const { return 1ULL << size_degree; }
109 inline size_t max_fill() const { return 1ULL << (size_degree - 1); }
110 inline size_t mask() const { return buf_size() - 1; }
111 inline size_t place(HashValue x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
112
113 /// The value is divided by 2 ^ skip_degree
114 inline bool good(HashValue hash) const
115 {
116 return hash == ((hash >> skip_degree) << skip_degree);
117 }
118
119 HashValue hash(Value key) const
120 {
121 return Hash()(key);
122 }
123
124 /// Delete all values whose hashes do not divide by 2 ^ skip_degree
125 void rehash()
126 {
127 for (size_t i = 0; i < buf_size(); ++i)
128 {
129 if (buf[i])
130 {
131 if (!good(buf[i]))
132 {
133 buf[i] = 0;
134 --m_size;
135 }
136 /** After removing the elements, there may have been room for items,
137 * which were placed further than necessary, due to a collision.
138 * You need to move them.
139 */
140 else if (i != place(buf[i]))
141 {
142 HashValue x = buf[i];
143 buf[i] = 0;
144 reinsertImpl(x);
145 }
146 }
147 }
148
149 /** We must process first collision resolution chain once again.
150 * Look at the comment in "resize" function.
151 */
152 for (size_t i = 0; i < buf_size() && buf[i]; ++i)
153 {
154 if (i != place(buf[i]))
155 {
156 HashValue x = buf[i];
157 buf[i] = 0;
158 reinsertImpl(x);
159 }
160 }
161 }
162
163 /// Increase the size of the buffer 2 times or up to new_size_degree, if it is non-zero.
164 void resize(size_t new_size_degree = 0)
165 {
166 size_t old_size = buf_size();
167
168 if (!new_size_degree)
169 new_size_degree = size_degree + 1;
170
171 /// Expand the space.
172 buf = reinterpret_cast<HashValue *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0])));
173 size_degree = new_size_degree;
174
175 /** Now some items may need to be moved to a new location.
176 * The element can stay in place, or move to a new location "on the right",
177 * or move to the left of the collision resolution chain, because the elements to the left of it have been moved to the new "right" location.
178 * There is also a special case
179 * if the element was to be at the end of the old buffer, [ x]
180 * but is at the beginning because of the collision resolution chain, [o x]
181 * then after resizing, it will first be out of place again, [ xo ]
182 * and in order to transfer it to where you need it,
183 * will have to be after transferring all elements from the old half [ o x ]
184 * process another tail from the collision resolution chain immediately after it [ o x ]
185 * This is why || buf[i] below.
186 */
187 for (size_t i = 0; i < old_size || buf[i]; ++i)
188 {
189 HashValue x = buf[i];
190 if (!x)
191 continue;
192
193 size_t place_value = place(x);
194
195 /// The element is in its place.
196 if (place_value == i)
197 continue;
198
199 while (buf[place_value] && buf[place_value] != x)
200 {
201 ++place_value;
202 place_value &= mask();
203
204#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
205 ++collisions;
206#endif
207 }
208
209 /// The element remained in its place.
210 if (buf[place_value] == x)
211 continue;
212
213 buf[place_value] = x;
214 buf[i] = 0;
215 }
216 }
217
218 /// Insert a value.
219 void insertImpl(HashValue x)
220 {
221 if (x == 0)
222 {
223 m_size += !has_zero;
224 has_zero = true;
225 return;
226 }
227
228 size_t place_value = place(x);
229 while (buf[place_value] && buf[place_value] != x)
230 {
231 ++place_value;
232 place_value &= mask();
233
234#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
235 ++collisions;
236#endif
237 }
238
239 if (buf[place_value] == x)
240 return;
241
242 buf[place_value] = x;
243 ++m_size;
244 }
245
246 /** Insert a value into the new buffer that was in the old buffer.
247 * Used when increasing the size of the buffer, as well as when reading from a file.
248 */
249 void reinsertImpl(HashValue x)
250 {
251 size_t place_value = place(x);
252 while (buf[place_value])
253 {
254 ++place_value;
255 place_value &= mask();
256
257#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
258 ++collisions;
259#endif
260 }
261
262 buf[place_value] = x;
263 }
264
265 /** If the hash table is full enough, then do resize.
266 * If there are too many items, then throw half the pieces until they are small enough.
267 */
268 void shrinkIfNeed()
269 {
270 if (unlikely(m_size > max_fill()))
271 {
272 if (m_size > UNIQUES_HASH_MAX_SIZE)
273 {
274 while (m_size > UNIQUES_HASH_MAX_SIZE)
275 {
276 ++skip_degree;
277 rehash();
278 }
279 }
280 else
281 resize();
282 }
283 }
284
285
286public:
287 using value_type = Value;
288
289 UniquesHashSet() :
290 m_size(0),
291 skip_degree(0),
292 has_zero(false)
293 {
294 alloc(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE);
295#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
296 collisions = 0;
297#endif
298 }
299
300 UniquesHashSet(const UniquesHashSet & rhs)
301 : m_size(rhs.m_size), skip_degree(rhs.skip_degree), has_zero(rhs.has_zero)
302 {
303 alloc(rhs.size_degree);
304 memcpy(buf, rhs.buf, buf_size() * sizeof(buf[0]));
305 }
306
307 UniquesHashSet & operator= (const UniquesHashSet & rhs)
308 {
309 if (size_degree != rhs.size_degree)
310 {
311 free();
312 alloc(rhs.size_degree);
313 }
314
315 m_size = rhs.m_size;
316 skip_degree = rhs.skip_degree;
317 has_zero = rhs.has_zero;
318
319 memcpy(buf, rhs.buf, buf_size() * sizeof(buf[0]));
320
321 return *this;
322 }
323
324 ~UniquesHashSet()
325 {
326 free();
327 }
328
329 void insert(Value x)
330 {
331 HashValue hash_value = hash(x);
332 if (!good(hash_value))
333 return;
334
335 insertImpl(hash_value);
336 shrinkIfNeed();
337 }
338
339 size_t size() const
340 {
341 if (0 == skip_degree)
342 return m_size;
343
344 size_t res = m_size * (1ULL << skip_degree);
345
346 /** Pseudo-random remainder - in order to be not visible,
347 * that the number is divided by the power of two.
348 */
349 res += (intHashCRC32(m_size) & ((1ULL << skip_degree) - 1));
350
351 /** Correction of a systematic error due to collisions during hashing in UInt32.
352 * `fixed_res(res)` formula
353 * - with how many different elements of fixed_res,
354 * when randomly scattered across 2^32 buckets,
355 * filled buckets with average of res is obtained.
356 */
357 size_t p32 = 1ULL << 32;
358 size_t fixed_res = round(p32 * (log(p32) - log(p32 - res)));
359 return fixed_res;
360 }
361
362 void merge(const UniquesHashSet & rhs)
363 {
364 if (rhs.skip_degree > skip_degree)
365 {
366 skip_degree = rhs.skip_degree;
367 rehash();
368 }
369
370 if (!has_zero && rhs.has_zero)
371 {
372 has_zero = true;
373 ++m_size;
374 shrinkIfNeed();
375 }
376
377 for (size_t i = 0; i < rhs.buf_size(); ++i)
378 {
379 if (rhs.buf[i] && good(rhs.buf[i]))
380 {
381 insertImpl(rhs.buf[i]);
382 shrinkIfNeed();
383 }
384 }
385 }
386
387 void write(DB::WriteBuffer & wb) const
388 {
389 if (m_size > UNIQUES_HASH_MAX_SIZE)
390 throw Poco::Exception("Cannot write UniquesHashSet: too large size_degree.");
391
392 DB::writeIntBinary(skip_degree, wb);
393 DB::writeVarUInt(m_size, wb);
394
395 if (has_zero)
396 {
397 HashValue x = 0;
398 DB::writeIntBinary(x, wb);
399 }
400
401 for (size_t i = 0; i < buf_size(); ++i)
402 if (buf[i])
403 DB::writeIntBinary(buf[i], wb);
404 }
405
406 void read(DB::ReadBuffer & rb)
407 {
408 has_zero = false;
409
410 DB::readIntBinary(skip_degree, rb);
411 DB::readVarUInt(m_size, rb);
412
413 if (m_size > UNIQUES_HASH_MAX_SIZE)
414 throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
415
416 free();
417
418 UInt8 new_size_degree = m_size <= 1
419 ? UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE
420 : std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(m_size - 1)) + 2);
421
422 alloc(new_size_degree);
423
424 for (size_t i = 0; i < m_size; ++i)
425 {
426 HashValue x = 0;
427 DB::readIntBinary(x, rb);
428 if (x == 0)
429 has_zero = true;
430 else
431 reinsertImpl(x);
432 }
433 }
434
435 void readAndMerge(DB::ReadBuffer & rb)
436 {
437 UInt8 rhs_skip_degree = 0;
438 DB::readIntBinary(rhs_skip_degree, rb);
439
440 if (rhs_skip_degree > skip_degree)
441 {
442 skip_degree = rhs_skip_degree;
443 rehash();
444 }
445
446 size_t rhs_size = 0;
447 DB::readVarUInt(rhs_size, rb);
448
449 if (rhs_size > UNIQUES_HASH_MAX_SIZE)
450 throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
451
452 if ((1ULL << size_degree) < rhs_size)
453 {
454 UInt8 new_size_degree = std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(rhs_size - 1)) + 2);
455 resize(new_size_degree);
456 }
457
458 for (size_t i = 0; i < rhs_size; ++i)
459 {
460 HashValue x = 0;
461 DB::readIntBinary(x, rb);
462 insertHash(x);
463 }
464 }
465
466 static void skip(DB::ReadBuffer & rb)
467 {
468 size_t size = 0;
469
470 rb.ignore();
471 DB::readVarUInt(size, rb);
472
473 if (size > UNIQUES_HASH_MAX_SIZE)
474 throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
475
476 rb.ignore(sizeof(HashValue) * size);
477 }
478
479 void writeText(DB::WriteBuffer & wb) const
480 {
481 if (m_size > UNIQUES_HASH_MAX_SIZE)
482 throw Poco::Exception("Cannot write UniquesHashSet: too large size_degree.");
483
484 DB::writeIntText(skip_degree, wb);
485 wb.write(",", 1);
486 DB::writeIntText(m_size, wb);
487
488 if (has_zero)
489 wb.write(",0", 2);
490
491 for (size_t i = 0; i < buf_size(); ++i)
492 {
493 if (buf[i])
494 {
495 wb.write(",", 1);
496 DB::writeIntText(buf[i], wb);
497 }
498 }
499 }
500
501 void readText(DB::ReadBuffer & rb)
502 {
503 has_zero = false;
504
505 DB::readIntText(skip_degree, rb);
506 DB::assertChar(',', rb);
507 DB::readIntText(m_size, rb);
508
509 if (m_size > UNIQUES_HASH_MAX_SIZE)
510 throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
511
512 free();
513
514 UInt8 new_size_degree = m_size <= 1
515 ? UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE
516 : std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(m_size - 1)) + 2);
517
518 alloc(new_size_degree);
519
520 for (size_t i = 0; i < m_size; ++i)
521 {
522 HashValue x = 0;
523 DB::assertChar(',', rb);
524 DB::readIntText(x, rb);
525 if (x == 0)
526 has_zero = true;
527 else
528 reinsertImpl(x);
529 }
530 }
531
532 void insertHash(HashValue hash_value)
533 {
534 if (!good(hash_value))
535 return;
536
537 insertImpl(hash_value);
538 shrinkIfNeed();
539 }
540
541#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
542 size_t getCollisions() const
543 {
544 return collisions;
545 }
546#endif
547};
548
549
550#undef UNIQUES_HASH_MAX_SIZE_DEGREE
551#undef UNIQUES_HASH_MAX_SIZE
552#undef UNIQUES_HASH_BITS_FOR_SKIP
553#undef UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE
554