1#pragma once
2
3#include <iostream>
4#include <vector>
5
6#include <boost/range/adaptor/reversed.hpp>
7
8#include <Common/ArenaWithFreeLists.h>
9#include <Common/UInt128.h>
10#include <Common/HashTable/Hash.h>
11#include <Common/HashTable/HashMap.h>
12
13#include <IO/WriteBuffer.h>
14#include <IO/WriteHelpers.h>
15#include <IO/ReadBuffer.h>
16#include <IO/ReadHelpers.h>
17#include <IO/VarInt.h>
18
19/*
20 * Implementation of the Filtered Space-Saving for TopK streaming analysis.
21 * http://www.l2f.inesc-id.pt/~fmmb/wiki/uploads/Work/misnis.ref0a.pdf
22 * It implements suggested reduce-and-combine algorithm from Parallel Space Saving:
23 * https://arxiv.org/pdf/1401.0702.pdf
24 */
25
26namespace DB
27{
28
29/*
30 * Arena interface to allow specialized storage of keys.
31 * POD keys do not require additional storage, so this interface is empty.
32 */
33template <typename TKey>
34struct SpaceSavingArena
35{
36 SpaceSavingArena() {}
37 const TKey emplace(const TKey & key) { return key; }
38 void free(const TKey & /*key*/) {}
39};
40
41/*
42 * Specialized storage for StringRef with a freelist arena.
43 * Keys of this type that are retained on insertion must be serialized into local storage,
44 * otherwise the reference would be invalid after the processed block is released.
45 */
46template <>
47struct SpaceSavingArena<StringRef>
48{
49 const StringRef emplace(const StringRef & key)
50 {
51 auto ptr = arena.alloc(key.size);
52 std::copy(key.data, key.data + key.size, ptr);
53 return StringRef{ptr, key.size};
54 }
55
56 void free(const StringRef & key)
57 {
58 if (key.data)
59 arena.free(const_cast<char *>(key.data), key.size);
60 }
61
62private:
63 ArenaWithFreeLists arena;
64};
65
66
67template
68<
69 typename TKey,
70 typename Hash = DefaultHash<TKey>,
71 typename Grower = HashTableGrower<>,
72 typename Allocator = HashTableAllocator
73>
74class SpaceSaving
75{
76private:
77 // Suggested constants in the paper "Finding top-k elements in data streams", chap 6. equation (24)
78 // Round to nearest power of 2 for cheaper binning without modulo
79 constexpr uint64_t nextAlphaSize(uint64_t x)
80 {
81 constexpr uint64_t ALPHA_MAP_ELEMENTS_PER_COUNTER = 6;
82 return 1ULL << (sizeof(uint64_t) * 8 - __builtin_clzll(x * ALPHA_MAP_ELEMENTS_PER_COUNTER));
83 }
84
85public:
86 using Self = SpaceSaving;
87
88 struct Counter
89 {
90 Counter() {}
91
92 Counter(const TKey & k, UInt64 c = 0, UInt64 e = 0, size_t h = 0)
93 : key(k), slot(0), hash(h), count(c), error(e) {}
94
95 void write(WriteBuffer & wb) const
96 {
97 writeBinary(key, wb);
98 writeVarUInt(count, wb);
99 writeVarUInt(error, wb);
100 }
101
102 void read(ReadBuffer & rb)
103 {
104 readBinary(key, rb);
105 readVarUInt(count, rb);
106 readVarUInt(error, rb);
107 }
108
109 // greater() taking slot error into account
110 bool operator> (const Counter & b) const
111 {
112 return (count > b.count) || (count == b.count && error < b.error);
113 }
114
115 TKey key;
116 size_t slot;
117 size_t hash;
118 UInt64 count;
119 UInt64 error;
120 };
121
122 SpaceSaving(size_t c = 10) : alpha_map(nextAlphaSize(c)), m_capacity(c) {}
123
124 ~SpaceSaving() { destroyElements(); }
125
126 inline size_t size() const
127 {
128 return counter_list.size();
129 }
130
131 inline size_t capacity() const
132 {
133 return m_capacity;
134 }
135
136 void clear()
137 {
138 return destroyElements();
139 }
140
141 void resize(size_t new_capacity)
142 {
143 counter_list.reserve(new_capacity);
144 alpha_map.resize(nextAlphaSize(new_capacity));
145 m_capacity = new_capacity;
146 }
147
148 void insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0)
149 {
150 // Increase weight of a key that already exists
151 auto hash = counter_map.hash(key);
152 auto counter = findCounter(key, hash);
153 if (counter)
154 {
155 counter->count += increment;
156 counter->error += error;
157 percolate(counter);
158 return;
159 }
160 // Key doesn't exist, but can fit in the top K
161 else if (unlikely(size() < capacity()))
162 {
163 auto c = new Counter(arena.emplace(key), increment, error, hash);
164 push(c);
165 return;
166 }
167
168 auto min = counter_list.back();
169 // The key doesn't exist and cannot fit in the current top K, but
170 // the new key has a bigger weight and is virtually more present
171 // compared to the element who is less present on the set. This part
172 // of the code is useful for the function topKWeighted
173 if (increment > min->count)
174 {
175 destroyLastElement();
176 push(new Counter(arena.emplace(key), increment, error, hash));
177 return;
178 }
179
180 const size_t alpha_mask = alpha_map.size() - 1;
181 auto & alpha = alpha_map[hash & alpha_mask];
182 if (alpha + increment < min->count)
183 {
184 alpha += increment;
185 return;
186 }
187
188 // Erase the current minimum element
189 alpha_map[min->hash & alpha_mask] = min->count;
190 destroyLastElement();
191
192 push(new Counter(arena.emplace(key), alpha + increment, alpha + error, hash));
193 }
194
195 /*
196 * Parallel Space Saving reduction and combine step from:
197 * https://arxiv.org/pdf/1401.0702.pdf
198 */
199 void merge(const Self & rhs)
200 {
201 UInt64 m1 = 0;
202 UInt64 m2 = 0;
203
204 if (size() == capacity())
205 {
206 m1 = counter_list.back()->count;
207 }
208
209 if (rhs.size() == rhs.capacity())
210 {
211 m2 = rhs.counter_list.back()->count;
212 }
213
214 /*
215 * Updated algorithm to mutate current table in place
216 * without mutating rhs table or creating new one
217 * in the first step we expect that no elements overlap
218 * and in the second sweep we correct the error if they do.
219 */
220 if (m2 > 0)
221 {
222 for (auto counter : counter_list)
223 {
224 counter->count += m2;
225 counter->error += m2;
226 }
227 }
228
229 // The list is sorted in descending order, we have to scan in reverse
230 for (auto counter : boost::adaptors::reverse(rhs.counter_list))
231 {
232 size_t hash = counter_map.hash(counter->key);
233 if (auto current = findCounter(counter->key, hash))
234 {
235 // Subtract m2 previously added, guaranteed not negative
236 current->count += (counter->count - m2);
237 current->error += (counter->error - m2);
238 }
239 else
240 {
241 // Counters not monitored in S1
242 counter_list.push_back(new Counter(arena.emplace(counter->key), counter->count + m1, counter->error + m1, hash));
243 }
244 }
245
246 std::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; });
247
248 if (counter_list.size() > m_capacity)
249 {
250 for (size_t i = m_capacity; i < counter_list.size(); ++i)
251 {
252 arena.free(counter_list[i]->key);
253 delete counter_list[i];
254 }
255 counter_list.resize(m_capacity);
256 }
257
258 for (size_t i = 0; i < counter_list.size(); ++i)
259 counter_list[i]->slot = i;
260 rebuildCounterMap();
261 }
262
263 std::vector<Counter> topK(size_t k) const
264 {
265 std::vector<Counter> res;
266 for (auto counter : counter_list)
267 {
268 res.push_back(*counter);
269 if (res.size() == k)
270 break;
271 }
272 return res;
273 }
274
275 void write(WriteBuffer & wb) const
276 {
277 writeVarUInt(size(), wb);
278 for (auto counter : counter_list)
279 counter->write(wb);
280
281 writeVarUInt(alpha_map.size(), wb);
282 for (auto alpha : alpha_map)
283 writeVarUInt(alpha, wb);
284 }
285
286 void read(ReadBuffer & rb)
287 {
288 destroyElements();
289 size_t count = 0;
290 readVarUInt(count, rb);
291
292 for (size_t i = 0; i < count; ++i)
293 {
294 auto counter = new Counter();
295 counter->read(rb);
296 counter->hash = counter_map.hash(counter->key);
297 push(counter);
298 }
299
300 readAlphaMap(rb);
301 }
302
303 void readAlphaMap(ReadBuffer & rb)
304 {
305 size_t alpha_size = 0;
306 readVarUInt(alpha_size, rb);
307 for (size_t i = 0; i < alpha_size; ++i)
308 {
309 UInt64 alpha = 0;
310 readVarUInt(alpha, rb);
311 alpha_map.push_back(alpha);
312 }
313 }
314
315protected:
316 void push(Counter * counter)
317 {
318 counter->slot = counter_list.size();
319 counter_list.push_back(counter);
320 counter_map[counter->key] = counter;
321 percolate(counter);
322 }
323
324 // This is equivallent to one step of bubble sort
325 void percolate(Counter * counter)
326 {
327 while (counter->slot > 0)
328 {
329 auto next = counter_list[counter->slot - 1];
330 if (*counter > *next)
331 {
332 std::swap(next->slot, counter->slot);
333 std::swap(counter_list[next->slot], counter_list[counter->slot]);
334 }
335 else
336 break;
337 }
338 }
339
340private:
341 void destroyElements()
342 {
343 for (auto counter : counter_list)
344 {
345 arena.free(counter->key);
346 delete counter;
347 }
348
349 counter_map.clear();
350 counter_list.clear();
351 alpha_map.clear();
352 }
353
354 void destroyLastElement()
355 {
356 auto last_element = counter_list.back();
357 arena.free(last_element->key);
358 delete last_element;
359 counter_list.pop_back();
360
361 ++removed_keys;
362 if (removed_keys * 2 > counter_map.size())
363 rebuildCounterMap();
364 }
365
366 Counter * findCounter(const TKey & key, size_t hash)
367 {
368 auto it = counter_map.find(key, hash);
369 if (!it)
370 return nullptr;
371
372 return it->getMapped();
373 }
374
375 void rebuildCounterMap()
376 {
377 removed_keys = 0;
378 counter_map.clear();
379 for (auto counter : counter_list)
380 counter_map[counter->key] = counter;
381 }
382
383 using CounterMap = HashMap<TKey, Counter *, Hash, Grower, Allocator>;
384
385 CounterMap counter_map;
386 std::vector<Counter *> counter_list;
387 std::vector<UInt64> alpha_map;
388 SpaceSavingArena<TKey> arena;
389 size_t m_capacity;
390 size_t removed_keys = 0;
391};
392
393}
394