1 | #pragma once |
2 | |
3 | #include <atomic> |
4 | #include <chrono> |
5 | #include <map> |
6 | #include <shared_mutex> |
7 | #include <variant> |
8 | #include <vector> |
9 | #include <Columns/ColumnDecimal.h> |
10 | #include <Columns/ColumnString.h> |
11 | #include <pcg_random.hpp> |
12 | #include <Common/ArenaWithFreeLists.h> |
13 | #include <Common/HashTable/HashMap.h> |
14 | #include <Common/ProfilingScopedRWLock.h> |
15 | #include <Common/SmallObjectPool.h> |
16 | #include <common/StringRef.h> |
17 | #include <ext/bit_cast.h> |
18 | #include <ext/map.h> |
19 | #include <ext/scope_guard.h> |
20 | #include "DictionaryStructure.h" |
21 | #include "IDictionary.h" |
22 | #include "IDictionarySource.h" |
23 | #include <DataStreams/IBlockInputStream.h> |
24 | |
25 | |
26 | namespace ProfileEvents |
27 | { |
28 | extern const Event DictCacheKeysRequested; |
29 | extern const Event DictCacheKeysRequestedMiss; |
30 | extern const Event DictCacheKeysRequestedFound; |
31 | extern const Event DictCacheKeysExpired; |
32 | extern const Event DictCacheKeysNotFound; |
33 | extern const Event DictCacheKeysHit; |
34 | extern const Event DictCacheRequestTimeNs; |
35 | extern const Event DictCacheLockWriteNs; |
36 | extern const Event DictCacheLockReadNs; |
37 | } |
38 | |
39 | namespace DB |
40 | { |
41 | class ComplexKeyCacheDictionary final : public IDictionaryBase |
42 | { |
43 | public: |
44 | ComplexKeyCacheDictionary( |
45 | const std::string & database_, |
46 | const std::string & name_, |
47 | const DictionaryStructure & dict_struct_, |
48 | DictionarySourcePtr source_ptr_, |
49 | const DictionaryLifetime dict_lifetime_, |
50 | const size_t size_); |
51 | |
52 | std::string getKeyDescription() const { return key_description; } |
53 | |
54 | const std::string & getDatabase() const override { return database; } |
55 | const std::string & getName() const override { return name; } |
56 | const std::string & getFullName() const override { return full_name; } |
57 | |
58 | std::string getTypeName() const override { return "ComplexKeyCache" ; } |
59 | |
60 | size_t getBytesAllocated() const override |
61 | { |
62 | return bytes_allocated + (key_size_is_fixed ? fixed_size_keys_pool->size() : keys_pool->size()) |
63 | + (string_arena ? string_arena->size() : 0); |
64 | } |
65 | |
66 | size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); } |
67 | |
68 | double getHitRate() const override |
69 | { |
70 | return static_cast<double>(hit_count.load(std::memory_order_acquire)) / query_count.load(std::memory_order_relaxed); |
71 | } |
72 | |
73 | size_t getElementCount() const override { return element_count.load(std::memory_order_relaxed); } |
74 | |
75 | double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; } |
76 | |
77 | bool supportUpdates() const override { return false; } |
78 | |
79 | std::shared_ptr<const IExternalLoadable> clone() const override |
80 | { |
81 | return std::make_shared<ComplexKeyCacheDictionary>(database, name, dict_struct, source_ptr->clone(), dict_lifetime, size); |
82 | } |
83 | |
84 | const IDictionarySource * getSource() const override { return source_ptr.get(); } |
85 | |
86 | const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } |
87 | |
88 | const DictionaryStructure & getStructure() const override { return dict_struct; } |
89 | |
90 | bool isInjective(const std::string & attribute_name) const override |
91 | { |
92 | return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; |
93 | } |
94 | |
95 | template <typename T> |
96 | using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>; |
97 | |
98 | /// In all functions below, key_columns must be full (non-constant) columns. |
99 | /// See the requirement in IDataType.h for text-serialization functions. |
100 | #define DECLARE(TYPE) \ |
101 | void get##TYPE( \ |
102 | const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const; |
103 | DECLARE(UInt8) |
104 | DECLARE(UInt16) |
105 | DECLARE(UInt32) |
106 | DECLARE(UInt64) |
107 | DECLARE(UInt128) |
108 | DECLARE(Int8) |
109 | DECLARE(Int16) |
110 | DECLARE(Int32) |
111 | DECLARE(Int64) |
112 | DECLARE(Float32) |
113 | DECLARE(Float64) |
114 | DECLARE(Decimal32) |
115 | DECLARE(Decimal64) |
116 | DECLARE(Decimal128) |
117 | #undef DECLARE |
118 | |
119 | void getString(const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const; |
120 | |
121 | #define DECLARE(TYPE) \ |
122 | void get##TYPE( \ |
123 | const std::string & attribute_name, \ |
124 | const Columns & key_columns, \ |
125 | const DataTypes & key_types, \ |
126 | const PaddedPODArray<TYPE> & def, \ |
127 | ResultArrayType<TYPE> & out) const; |
128 | DECLARE(UInt8) |
129 | DECLARE(UInt16) |
130 | DECLARE(UInt32) |
131 | DECLARE(UInt64) |
132 | DECLARE(UInt128) |
133 | DECLARE(Int8) |
134 | DECLARE(Int16) |
135 | DECLARE(Int32) |
136 | DECLARE(Int64) |
137 | DECLARE(Float32) |
138 | DECLARE(Float64) |
139 | DECLARE(Decimal32) |
140 | DECLARE(Decimal64) |
141 | DECLARE(Decimal128) |
142 | #undef DECLARE |
143 | |
144 | void getString( |
145 | const std::string & attribute_name, |
146 | const Columns & key_columns, |
147 | const DataTypes & key_types, |
148 | const ColumnString * const def, |
149 | ColumnString * const out) const; |
150 | |
151 | #define DECLARE(TYPE) \ |
152 | void get##TYPE( \ |
153 | const std::string & attribute_name, \ |
154 | const Columns & key_columns, \ |
155 | const DataTypes & key_types, \ |
156 | const TYPE def, \ |
157 | ResultArrayType<TYPE> & out) const; |
158 | DECLARE(UInt8) |
159 | DECLARE(UInt16) |
160 | DECLARE(UInt32) |
161 | DECLARE(UInt64) |
162 | DECLARE(UInt128) |
163 | DECLARE(Int8) |
164 | DECLARE(Int16) |
165 | DECLARE(Int32) |
166 | DECLARE(Int64) |
167 | DECLARE(Float32) |
168 | DECLARE(Float64) |
169 | DECLARE(Decimal32) |
170 | DECLARE(Decimal64) |
171 | DECLARE(Decimal128) |
172 | #undef DECLARE |
173 | |
174 | void getString( |
175 | const std::string & attribute_name, |
176 | const Columns & key_columns, |
177 | const DataTypes & key_types, |
178 | const String & def, |
179 | ColumnString * const out) const; |
180 | |
181 | void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const; |
182 | |
183 | BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; |
184 | |
185 | private: |
186 | template <typename Value> |
187 | using MapType = HashMapWithSavedHash<StringRef, Value, StringRefHash>; |
188 | template <typename Value> |
189 | using ContainerType = Value[]; |
190 | template <typename Value> |
191 | using ContainerPtrType = std::unique_ptr<ContainerType<Value>>; |
192 | |
193 | struct CellMetadata final |
194 | { |
195 | using time_point_t = std::chrono::system_clock::time_point; |
196 | using time_point_rep_t = time_point_t::rep; |
197 | using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>; |
198 | |
199 | static constexpr UInt64 EXPIRES_AT_MASK = std::numeric_limits<time_point_rep_t>::max(); |
200 | static constexpr UInt64 IS_DEFAULT_MASK = ~EXPIRES_AT_MASK; |
201 | |
202 | StringRef key; |
203 | decltype(StringRefHash{}(key)) hash; |
204 | /// Stores both expiration time and `is_default` flag in the most significant bit |
205 | time_point_urep_t data; |
206 | |
207 | /// Sets expiration time, resets `is_default` flag to false |
208 | time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); } |
209 | void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); } |
210 | |
211 | bool isDefault() const { return (data & IS_DEFAULT_MASK) == IS_DEFAULT_MASK; } |
212 | void setDefault() { data |= IS_DEFAULT_MASK; } |
213 | }; |
214 | |
215 | struct Attribute final |
216 | { |
217 | AttributeUnderlyingType type; |
218 | std::variant< |
219 | UInt8, |
220 | UInt16, |
221 | UInt32, |
222 | UInt64, |
223 | UInt128, |
224 | Int8, |
225 | Int16, |
226 | Int32, |
227 | Int64, |
228 | Decimal32, |
229 | Decimal64, |
230 | Decimal128, |
231 | Float32, |
232 | Float64, |
233 | String> |
234 | null_values; |
235 | std::variant< |
236 | ContainerPtrType<UInt8>, |
237 | ContainerPtrType<UInt16>, |
238 | ContainerPtrType<UInt32>, |
239 | ContainerPtrType<UInt64>, |
240 | ContainerPtrType<UInt128>, |
241 | ContainerPtrType<Int8>, |
242 | ContainerPtrType<Int16>, |
243 | ContainerPtrType<Int32>, |
244 | ContainerPtrType<Int64>, |
245 | ContainerPtrType<Decimal32>, |
246 | ContainerPtrType<Decimal64>, |
247 | ContainerPtrType<Decimal128>, |
248 | ContainerPtrType<Float32>, |
249 | ContainerPtrType<Float64>, |
250 | ContainerPtrType<StringRef>> |
251 | arrays; |
252 | }; |
253 | |
254 | void createAttributes(); |
255 | |
256 | Attribute createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value); |
257 | |
258 | template <typename AttributeType, typename OutputType, typename DefaultGetter> |
259 | void getItemsNumberImpl( |
260 | Attribute & attribute, const Columns & key_columns, PaddedPODArray<OutputType> & out, DefaultGetter && get_default) const |
261 | { |
262 | /// Mapping: <key> -> { all indices `i` of `key_columns` such that `key_columns[i]` = <key> } |
263 | MapType<std::vector<size_t>> outdated_keys; |
264 | auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays); |
265 | |
266 | const auto rows_num = key_columns.front()->size(); |
267 | const auto keys_size = dict_struct.key->size(); |
268 | StringRefs keys(keys_size); |
269 | Arena temporary_keys_pool; |
270 | PODArray<StringRef> keys_array(rows_num); |
271 | |
272 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
273 | { |
274 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
275 | |
276 | const auto now = std::chrono::system_clock::now(); |
277 | /// fetch up-to-date values, decide which ones require update |
278 | for (const auto row : ext::range(0, rows_num)) |
279 | { |
280 | const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); |
281 | keys_array[row] = key; |
282 | const auto find_result = findCellIdx(key, now); |
283 | |
284 | /** cell should be updated if either: |
285 | * 1. keys (or hash) do not match, |
286 | * 2. cell has expired, |
287 | * 3. explicit defaults were specified and cell was set default. */ |
288 | |
289 | if (!find_result.valid) |
290 | { |
291 | outdated_keys[key].push_back(row); |
292 | if (find_result.outdated) |
293 | ++cache_expired; |
294 | else |
295 | ++cache_not_found; |
296 | } |
297 | else |
298 | { |
299 | ++cache_hit; |
300 | const auto & cell_idx = find_result.cell_idx; |
301 | const auto & cell = cells[cell_idx]; |
302 | out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]); |
303 | } |
304 | } |
305 | } |
306 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
307 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
308 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
309 | query_count.fetch_add(rows_num, std::memory_order_relaxed); |
310 | hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); |
311 | |
312 | if (outdated_keys.empty()) |
313 | return; |
314 | |
315 | std::vector<size_t> required_rows(outdated_keys.size()); |
316 | std::transform( |
317 | std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) { return pair.getMapped().front(); }); |
318 | |
319 | /// request new values |
320 | update( |
321 | key_columns, |
322 | keys_array, |
323 | required_rows, |
324 | [&](const StringRef key, const size_t cell_idx) |
325 | { |
326 | for (const auto row : outdated_keys[key]) |
327 | out[row] = static_cast<OutputType>(attribute_array[cell_idx]); |
328 | }, |
329 | [&](const StringRef key, const size_t) |
330 | { |
331 | for (const auto row : outdated_keys[key]) |
332 | out[row] = get_default(row); |
333 | }); |
334 | } |
335 | |
336 | template <typename DefaultGetter> |
337 | void getItemsString(Attribute & attribute, const Columns & key_columns, ColumnString * out, DefaultGetter && get_default) const |
338 | { |
339 | const auto rows_num = key_columns.front()->size(); |
340 | /// save on some allocations |
341 | out->getOffsets().reserve(rows_num); |
342 | |
343 | const auto keys_size = dict_struct.key->size(); |
344 | StringRefs keys(keys_size); |
345 | Arena temporary_keys_pool; |
346 | |
347 | auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays); |
348 | |
349 | auto found_outdated_values = false; |
350 | |
351 | /// perform optimistic version, fallback to pessimistic if failed |
352 | { |
353 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
354 | |
355 | const auto now = std::chrono::system_clock::now(); |
356 | /// fetch up-to-date values, discard on fail |
357 | for (const auto row : ext::range(0, rows_num)) |
358 | { |
359 | const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); |
360 | SCOPE_EXIT(temporary_keys_pool.rollback(key.size)); |
361 | const auto find_result = findCellIdx(key, now); |
362 | |
363 | if (!find_result.valid) |
364 | { |
365 | found_outdated_values = true; |
366 | break; |
367 | } |
368 | else |
369 | { |
370 | const auto & cell_idx = find_result.cell_idx; |
371 | const auto & cell = cells[cell_idx]; |
372 | const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; |
373 | out->insertData(string_ref.data, string_ref.size); |
374 | } |
375 | } |
376 | } |
377 | |
378 | /// optimistic code completed successfully |
379 | if (!found_outdated_values) |
380 | { |
381 | query_count.fetch_add(rows_num, std::memory_order_relaxed); |
382 | hit_count.fetch_add(rows_num, std::memory_order_release); |
383 | return; |
384 | } |
385 | |
386 | /// now onto the pessimistic one, discard possible partial results from the optimistic path |
387 | out->getChars().resize_assume_reserved(0); |
388 | out->getOffsets().resize_assume_reserved(0); |
389 | |
390 | /// Mapping: <key> -> { all indices `i` of `key_columns` such that `key_columns[i]` = <key> } |
391 | MapType<std::vector<size_t>> outdated_keys; |
392 | /// we are going to store every string separately |
393 | MapType<StringRef> map; |
394 | PODArray<StringRef> keys_array(rows_num); |
395 | |
396 | size_t total_length = 0; |
397 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
398 | { |
399 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
400 | |
401 | const auto now = std::chrono::system_clock::now(); |
402 | for (const auto row : ext::range(0, rows_num)) |
403 | { |
404 | const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); |
405 | keys_array[row] = key; |
406 | const auto find_result = findCellIdx(key, now); |
407 | |
408 | if (!find_result.valid) |
409 | { |
410 | outdated_keys[key].push_back(row); |
411 | if (find_result.outdated) |
412 | ++cache_expired; |
413 | else |
414 | ++cache_not_found; |
415 | } |
416 | else |
417 | { |
418 | ++cache_hit; |
419 | const auto & cell_idx = find_result.cell_idx; |
420 | const auto & cell = cells[cell_idx]; |
421 | const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; |
422 | |
423 | if (!cell.isDefault()) |
424 | map[key] = copyIntoArena(string_ref, temporary_keys_pool); |
425 | |
426 | total_length += string_ref.size + 1; |
427 | } |
428 | } |
429 | } |
430 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
431 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
432 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
433 | |
434 | query_count.fetch_add(rows_num, std::memory_order_relaxed); |
435 | hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); |
436 | |
437 | /// request new values |
438 | if (!outdated_keys.empty()) |
439 | { |
440 | std::vector<size_t> required_rows(outdated_keys.size()); |
441 | std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) |
442 | { |
443 | return pair.getMapped().front(); |
444 | }); |
445 | |
446 | update( |
447 | key_columns, |
448 | keys_array, |
449 | required_rows, |
450 | [&](const StringRef key, const size_t cell_idx) |
451 | { |
452 | const StringRef attribute_value = attribute_array[cell_idx]; |
453 | |
454 | /// We must copy key and value to own memory, because it may be replaced with another |
455 | /// in next iterations of inner loop of update. |
456 | const StringRef copied_key = copyIntoArena(key, temporary_keys_pool); |
457 | const StringRef copied_value = copyIntoArena(attribute_value, temporary_keys_pool); |
458 | |
459 | map[copied_key] = copied_value; |
460 | total_length += (attribute_value.size + 1) * outdated_keys[key].size(); |
461 | }, |
462 | [&](const StringRef key, const size_t) |
463 | { |
464 | for (const auto row : outdated_keys[key]) |
465 | total_length += get_default(row).size + 1; |
466 | }); |
467 | } |
468 | |
469 | out->getChars().reserve(total_length); |
470 | |
471 | for (const auto row : ext::range(0, ext::size(keys_array))) |
472 | { |
473 | const StringRef key = keys_array[row]; |
474 | const auto it = map.find(key); |
475 | const auto string_ref = it ? it->getMapped() : get_default(row); |
476 | out->insertData(string_ref.data, string_ref.size); |
477 | } |
478 | } |
479 | |
480 | template <typename PresentKeyHandler, typename AbsentKeyHandler> |
481 | void update( |
482 | const Columns & in_key_columns, |
483 | const PODArray<StringRef> & in_keys, |
484 | const std::vector<size_t> & in_requested_rows, |
485 | PresentKeyHandler && on_cell_updated, |
486 | AbsentKeyHandler && on_key_not_found) const |
487 | { |
488 | MapType<bool> remaining_keys{in_requested_rows.size()}; |
489 | for (const auto row : in_requested_rows) |
490 | remaining_keys.insert({in_keys[row], false}); |
491 | |
492 | std::uniform_int_distribution<UInt64> distribution(dict_lifetime.min_sec, dict_lifetime.max_sec); |
493 | |
494 | const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; |
495 | { |
496 | Stopwatch watch; |
497 | auto stream = source_ptr->loadKeys(in_key_columns, in_requested_rows); |
498 | stream->readPrefix(); |
499 | |
500 | const auto keys_size = dict_struct.key->size(); |
501 | StringRefs keys(keys_size); |
502 | |
503 | const auto attributes_size = attributes.size(); |
504 | const auto now = std::chrono::system_clock::now(); |
505 | |
506 | while (const auto block = stream->read()) |
507 | { |
508 | /// cache column pointers |
509 | const auto key_columns = ext::map<Columns>( |
510 | ext::range(0, keys_size), [&](const size_t attribute_idx) { return block.safeGetByPosition(attribute_idx).column; }); |
511 | |
512 | const auto attribute_columns = ext::map<Columns>(ext::range(0, attributes_size), [&](const size_t attribute_idx) |
513 | { |
514 | return block.safeGetByPosition(keys_size + attribute_idx).column; |
515 | }); |
516 | |
517 | const auto rows_num = block.rows(); |
518 | |
519 | for (const auto row : ext::range(0, rows_num)) |
520 | { |
521 | auto key = allocKey(row, key_columns, keys); |
522 | const auto hash = StringRefHash{}(key); |
523 | const auto find_result = findCellIdx(key, now, hash); |
524 | const auto & cell_idx = find_result.cell_idx; |
525 | auto & cell = cells[cell_idx]; |
526 | |
527 | for (const auto attribute_idx : ext::range(0, attributes.size())) |
528 | { |
529 | const auto & attribute_column = *attribute_columns[attribute_idx]; |
530 | auto & attribute = attributes[attribute_idx]; |
531 | |
532 | setAttributeValue(attribute, cell_idx, attribute_column[row]); |
533 | } |
534 | |
535 | /// if cell id is zero and zero does not map to this cell, then the cell is unused |
536 | if (cell.key == StringRef{} && cell_idx != zero_cell_idx) |
537 | element_count.fetch_add(1, std::memory_order_relaxed); |
538 | |
539 | /// handle memory allocated for old key |
540 | if (key == cell.key) |
541 | { |
542 | freeKey(key); |
543 | key = cell.key; |
544 | } |
545 | else |
546 | { |
547 | /// new key is different from the old one |
548 | if (cell.key.data) |
549 | freeKey(cell.key); |
550 | |
551 | cell.key = key; |
552 | } |
553 | |
554 | cell.hash = hash; |
555 | |
556 | if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) |
557 | cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); |
558 | else |
559 | cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); |
560 | |
561 | /// inform caller |
562 | on_cell_updated(key, cell_idx); |
563 | /// mark corresponding id as found |
564 | remaining_keys[key] = true; |
565 | } |
566 | } |
567 | |
568 | stream->readSuffix(); |
569 | |
570 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, in_requested_rows.size()); |
571 | ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); |
572 | } |
573 | |
574 | size_t found_num = 0; |
575 | size_t not_found_num = 0; |
576 | |
577 | const auto now = std::chrono::system_clock::now(); |
578 | |
579 | /// Check which ids have not been found and require setting null_value |
580 | for (const auto & key_found_pair : remaining_keys) |
581 | { |
582 | if (key_found_pair.getMapped()) |
583 | { |
584 | ++found_num; |
585 | continue; |
586 | } |
587 | |
588 | ++not_found_num; |
589 | |
590 | auto key = key_found_pair.getKey(); |
591 | const auto hash = StringRefHash{}(key); |
592 | const auto find_result = findCellIdx(key, now, hash); |
593 | const auto & cell_idx = find_result.cell_idx; |
594 | auto & cell = cells[cell_idx]; |
595 | |
596 | /// Set null_value for each attribute |
597 | for (auto & attribute : attributes) |
598 | setDefaultAttributeValue(attribute, cell_idx); |
599 | |
600 | /// Check if cell had not been occupied before and increment element counter if it hadn't |
601 | if (cell.key == StringRef{} && cell_idx != zero_cell_idx) |
602 | element_count.fetch_add(1, std::memory_order_relaxed); |
603 | |
604 | if (key == cell.key) |
605 | key = cell.key; |
606 | else |
607 | { |
608 | if (cell.key.data) |
609 | freeKey(cell.key); |
610 | |
611 | /// copy key from temporary pool |
612 | key = copyKey(key); |
613 | cell.key = key; |
614 | } |
615 | |
616 | cell.hash = hash; |
617 | |
618 | if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) |
619 | cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); |
620 | else |
621 | cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); |
622 | |
623 | cell.setDefault(); |
624 | |
625 | /// inform caller that the cell has not been found |
626 | on_key_not_found(key, cell_idx); |
627 | } |
628 | |
629 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, found_num); |
630 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); |
631 | } |
632 | |
633 | UInt64 getCellIdx(const StringRef key) const; |
634 | |
635 | void setDefaultAttributeValue(Attribute & attribute, const size_t idx) const; |
636 | |
637 | void setAttributeValue(Attribute & attribute, const size_t idx, const Field & value) const; |
638 | |
639 | Attribute & getAttribute(const std::string & attribute_name) const; |
640 | |
641 | StringRef allocKey(const size_t row, const Columns & key_columns, StringRefs & keys) const; |
642 | |
643 | void freeKey(const StringRef key) const; |
644 | |
645 | template <typename Arena> |
646 | static StringRef placeKeysInPool( |
647 | const size_t row, |
648 | const Columns & key_columns, |
649 | StringRefs & keys, |
650 | const std::vector<DictionaryAttribute> & key_attributes, |
651 | Arena & pool); |
652 | |
653 | StringRef placeKeysInFixedSizePool(const size_t row, const Columns & key_columns) const; |
654 | |
655 | static StringRef copyIntoArena(StringRef src, Arena & arena); |
656 | StringRef copyKey(const StringRef key) const; |
657 | |
658 | struct FindResult |
659 | { |
660 | const size_t cell_idx; |
661 | const bool valid; |
662 | const bool outdated; |
663 | }; |
664 | |
665 | FindResult findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const; |
666 | FindResult findCellIdx(const StringRef & key, const CellMetadata::time_point_t now) const |
667 | { |
668 | const auto hash = StringRefHash{}(key); |
669 | return findCellIdx(key, now, hash); |
670 | } |
671 | |
672 | bool isEmptyCell(const UInt64 idx) const; |
673 | |
674 | const std::string database; |
675 | const std::string name; |
676 | const std::string full_name; |
677 | const DictionaryStructure dict_struct; |
678 | const DictionarySourcePtr source_ptr; |
679 | const DictionaryLifetime dict_lifetime; |
680 | const std::string key_description{dict_struct.getKeyDescription()}; |
681 | |
682 | mutable std::shared_mutex rw_lock; |
683 | |
684 | /// Actual size will be increased to match power of 2 |
685 | const size_t size; |
686 | |
687 | /// all bits to 1 mask (size - 1) (0b1000 - 1 = 0b111) |
688 | const size_t size_overlap_mask; |
689 | |
690 | /// Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3 |
691 | static constexpr size_t max_collision_length = 10; |
692 | |
693 | const UInt64 zero_cell_idx{getCellIdx(StringRef{})}; |
694 | std::map<std::string, size_t> attribute_index_by_name; |
695 | mutable std::vector<Attribute> attributes; |
696 | mutable std::vector<CellMetadata> cells{size}; |
697 | const bool key_size_is_fixed{dict_struct.isKeySizeFixed()}; |
698 | size_t key_size{key_size_is_fixed ? dict_struct.getKeySize() : 0}; |
699 | std::unique_ptr<ArenaWithFreeLists> keys_pool = key_size_is_fixed ? nullptr : std::make_unique<ArenaWithFreeLists>(); |
700 | std::unique_ptr<SmallObjectPool> fixed_size_keys_pool = key_size_is_fixed ? std::make_unique<SmallObjectPool>(key_size) : nullptr; |
701 | std::unique_ptr<ArenaWithFreeLists> string_arena; |
702 | |
703 | mutable pcg64 rnd_engine; |
704 | |
705 | mutable size_t bytes_allocated = 0; |
706 | mutable std::atomic<size_t> element_count{0}; |
707 | mutable std::atomic<size_t> hit_count{0}; |
708 | mutable std::atomic<size_t> query_count{0}; |
709 | |
710 | const std::chrono::time_point<std::chrono::system_clock> creation_time = std::chrono::system_clock::now(); |
711 | }; |
712 | |
713 | } |
714 | |