1 | #include "ComplexKeyCacheDictionary.h" |
2 | #include <Common/Arena.h> |
3 | #include <Common/BitHelpers.h> |
4 | #include <Common/CurrentMetrics.h> |
5 | #include <Common/ProfileEvents.h> |
6 | #include <Common/ProfilingScopedRWLock.h> |
7 | #include <Common/Stopwatch.h> |
8 | #include <Common/randomSeed.h> |
9 | #include <ext/map.h> |
10 | #include <ext/range.h> |
11 | #include "DictionaryBlockInputStream.h" |
12 | #include "DictionaryFactory.h" |
13 | |
14 | |
15 | namespace ProfileEvents |
16 | { |
17 | extern const Event DictCacheKeysRequested; |
18 | extern const Event DictCacheKeysRequestedMiss; |
19 | extern const Event DictCacheKeysRequestedFound; |
20 | extern const Event DictCacheKeysExpired; |
21 | extern const Event DictCacheKeysNotFound; |
22 | extern const Event DictCacheKeysHit; |
23 | extern const Event DictCacheRequestTimeNs; |
24 | extern const Event DictCacheLockWriteNs; |
25 | extern const Event DictCacheLockReadNs; |
26 | } |
27 | |
28 | namespace CurrentMetrics |
29 | { |
30 | extern const Metric DictCacheRequests; |
31 | } |
32 | |
33 | |
34 | namespace DB |
35 | { |
36 | namespace ErrorCodes |
37 | { |
38 | extern const int TYPE_MISMATCH; |
39 | extern const int BAD_ARGUMENTS; |
40 | extern const int UNSUPPORTED_METHOD; |
41 | extern const int TOO_SMALL_BUFFER_SIZE; |
42 | } |
43 | |
44 | |
45 | inline UInt64 ComplexKeyCacheDictionary::getCellIdx(const StringRef key) const |
46 | { |
47 | const auto hash = StringRefHash{}(key); |
48 | const auto idx = hash & size_overlap_mask; |
49 | return idx; |
50 | } |
51 | |
52 | |
53 | ComplexKeyCacheDictionary::ComplexKeyCacheDictionary( |
54 | const std::string & database_, |
55 | const std::string & name_, |
56 | const DictionaryStructure & dict_struct_, |
57 | DictionarySourcePtr source_ptr_, |
58 | const DictionaryLifetime dict_lifetime_, |
59 | const size_t size_) |
60 | : database(database_) |
61 | , name(name_) |
62 | , full_name{database_.empty() ? name_ : (database_ + "." + name_)} |
63 | , dict_struct(dict_struct_) |
64 | , source_ptr{std::move(source_ptr_)} |
65 | , dict_lifetime(dict_lifetime_) |
66 | , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} |
67 | , size_overlap_mask{this->size - 1} |
68 | , rnd_engine(randomSeed()) |
69 | { |
70 | if (!this->source_ptr->supportsSelectiveLoad()) |
71 | throw Exception{full_name + ": source cannot be used with ComplexKeyCacheDictionary" , ErrorCodes::UNSUPPORTED_METHOD}; |
72 | |
73 | createAttributes(); |
74 | } |
75 | |
76 | |
77 | void ComplexKeyCacheDictionary::getString( |
78 | const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const |
79 | { |
80 | dict_struct.validateKeyTypes(key_types); |
81 | |
82 | auto & attribute = getAttribute(attribute_name); |
83 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
84 | |
85 | const auto null_value = StringRef{std::get<String>(attribute.null_values)}; |
86 | |
87 | getItemsString(attribute, key_columns, out, [&](const size_t) { return null_value; }); |
88 | } |
89 | |
90 | void ComplexKeyCacheDictionary::getString( |
91 | const std::string & attribute_name, |
92 | const Columns & key_columns, |
93 | const DataTypes & key_types, |
94 | const ColumnString * const def, |
95 | ColumnString * const out) const |
96 | { |
97 | dict_struct.validateKeyTypes(key_types); |
98 | |
99 | auto & attribute = getAttribute(attribute_name); |
100 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
101 | |
102 | getItemsString(attribute, key_columns, out, [&](const size_t row) { return def->getDataAt(row); }); |
103 | } |
104 | |
105 | void ComplexKeyCacheDictionary::getString( |
106 | const std::string & attribute_name, |
107 | const Columns & key_columns, |
108 | const DataTypes & key_types, |
109 | const String & def, |
110 | ColumnString * const out) const |
111 | { |
112 | dict_struct.validateKeyTypes(key_types); |
113 | |
114 | auto & attribute = getAttribute(attribute_name); |
115 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
116 | |
117 | getItemsString(attribute, key_columns, out, [&](const size_t) { return StringRef{def}; }); |
118 | } |
119 | |
120 | /// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag, |
121 | /// true false found and valid |
122 | /// false true not found (something outdated, maybe our cell) |
123 | /// false false not found (other id stored with valid data) |
124 | /// true true impossible |
125 | /// |
126 | /// todo: split this func to two: find_for_get and find_for_set |
127 | ComplexKeyCacheDictionary::FindResult |
128 | ComplexKeyCacheDictionary::findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const |
129 | { |
130 | auto pos = hash; |
131 | auto oldest_id = pos; |
132 | auto oldest_time = CellMetadata::time_point_t::max(); |
133 | const auto stop = pos + max_collision_length; |
134 | |
135 | for (; pos < stop; ++pos) |
136 | { |
137 | const auto cell_idx = pos & size_overlap_mask; |
138 | const auto & cell = cells[cell_idx]; |
139 | |
140 | if (cell.hash != hash || cell.key != key) |
141 | { |
142 | /// maybe we already found nearest expired cell |
143 | if (oldest_time > now && oldest_time > cell.expiresAt()) |
144 | { |
145 | oldest_time = cell.expiresAt(); |
146 | oldest_id = cell_idx; |
147 | } |
148 | |
149 | continue; |
150 | } |
151 | |
152 | if (cell.expiresAt() < now) |
153 | { |
154 | return {cell_idx, false, true}; |
155 | } |
156 | |
157 | return {cell_idx, true, false}; |
158 | } |
159 | |
160 | oldest_id &= size_overlap_mask; |
161 | return {oldest_id, false, false}; |
162 | } |
163 | |
164 | void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const |
165 | { |
166 | dict_struct.validateKeyTypes(key_types); |
167 | |
168 | /// Mapping: <key> -> { all indices `i` of `key_columns` such that `key_columns[i]` = <key> } |
169 | MapType<std::vector<size_t>> outdated_keys; |
170 | |
171 | |
172 | const auto rows_num = key_columns.front()->size(); |
173 | const auto keys_size = dict_struct.key->size(); |
174 | StringRefs keys(keys_size); |
175 | Arena temporary_keys_pool; |
176 | PODArray<StringRef> keys_array(rows_num); |
177 | |
178 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
179 | { |
180 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
181 | |
182 | const auto now = std::chrono::system_clock::now(); |
183 | /// fetch up-to-date values, decide which ones require update |
184 | for (const auto row : ext::range(0, rows_num)) |
185 | { |
186 | const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); |
187 | keys_array[row] = key; |
188 | const auto find_result = findCellIdx(key, now); |
189 | const auto & cell_idx = find_result.cell_idx; |
190 | /** cell should be updated if either: |
191 | * 1. keys (or hash) do not match, |
192 | * 2. cell has expired, |
193 | * 3. explicit defaults were specified and cell was set default. */ |
194 | if (!find_result.valid) |
195 | { |
196 | outdated_keys[key].push_back(row); |
197 | if (find_result.outdated) |
198 | ++cache_expired; |
199 | else |
200 | ++cache_not_found; |
201 | } |
202 | else |
203 | { |
204 | ++cache_hit; |
205 | const auto & cell = cells[cell_idx]; |
206 | out[row] = !cell.isDefault(); |
207 | } |
208 | } |
209 | } |
210 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
211 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
212 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
213 | |
214 | query_count.fetch_add(rows_num, std::memory_order_relaxed); |
215 | hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); |
216 | |
217 | if (outdated_keys.empty()) |
218 | return; |
219 | |
220 | std::vector<size_t> required_rows(outdated_keys.size()); |
221 | std::transform( |
222 | std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) { return pair.getMapped().front(); }); |
223 | |
224 | /// request new values |
225 | update( |
226 | key_columns, |
227 | keys_array, |
228 | required_rows, |
229 | [&](const StringRef key, const auto) |
230 | { |
231 | for (const auto out_idx : outdated_keys[key]) |
232 | out[out_idx] = true; |
233 | }, |
234 | [&](const StringRef key, const auto) |
235 | { |
236 | for (const auto out_idx : outdated_keys[key]) |
237 | out[out_idx] = false; |
238 | }); |
239 | } |
240 | |
241 | void ComplexKeyCacheDictionary::createAttributes() |
242 | { |
243 | const auto attributes_size = dict_struct.attributes.size(); |
244 | attributes.reserve(attributes_size); |
245 | |
246 | bytes_allocated += size * sizeof(CellMetadata); |
247 | bytes_allocated += attributes_size * sizeof(attributes.front()); |
248 | |
249 | for (const auto & attribute : dict_struct.attributes) |
250 | { |
251 | attribute_index_by_name.emplace(attribute.name, attributes.size()); |
252 | attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value)); |
253 | |
254 | if (attribute.hierarchical) |
255 | throw Exception{full_name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(), |
256 | ErrorCodes::TYPE_MISMATCH}; |
257 | } |
258 | } |
259 | |
260 | ComplexKeyCacheDictionary::Attribute & ComplexKeyCacheDictionary::getAttribute(const std::string & attribute_name) const |
261 | { |
262 | const auto it = attribute_index_by_name.find(attribute_name); |
263 | if (it == std::end(attribute_index_by_name)) |
264 | throw Exception{full_name + ": no such attribute '" + attribute_name + "'" , ErrorCodes::BAD_ARGUMENTS}; |
265 | |
266 | return attributes[it->second]; |
267 | } |
268 | |
269 | StringRef ComplexKeyCacheDictionary::allocKey(const size_t row, const Columns & key_columns, StringRefs & keys) const |
270 | { |
271 | if (key_size_is_fixed) |
272 | return placeKeysInFixedSizePool(row, key_columns); |
273 | |
274 | return placeKeysInPool(row, key_columns, keys, *dict_struct.key, *keys_pool); |
275 | } |
276 | |
277 | void ComplexKeyCacheDictionary::freeKey(const StringRef key) const |
278 | { |
279 | if (key_size_is_fixed) |
280 | fixed_size_keys_pool->free(const_cast<char *>(key.data)); |
281 | else |
282 | keys_pool->free(const_cast<char *>(key.data), key.size); |
283 | } |
284 | |
285 | template <typename Pool> |
286 | StringRef ComplexKeyCacheDictionary::placeKeysInPool( |
287 | const size_t row, const Columns & key_columns, StringRefs & keys, const std::vector<DictionaryAttribute> & key_attributes, Pool & pool) |
288 | { |
289 | const auto keys_size = key_columns.size(); |
290 | size_t sum_keys_size{}; |
291 | |
292 | for (size_t j = 0; j < keys_size; ++j) |
293 | { |
294 | keys[j] = key_columns[j]->getDataAt(row); |
295 | sum_keys_size += keys[j].size; |
296 | if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString) |
297 | sum_keys_size += sizeof(size_t) + 1; |
298 | } |
299 | |
300 | auto place = pool.alloc(sum_keys_size); |
301 | |
302 | auto key_start = place; |
303 | for (size_t j = 0; j < keys_size; ++j) |
304 | { |
305 | if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString) |
306 | { |
307 | auto start = key_start; |
308 | auto key_size = keys[j].size + 1; |
309 | memcpy(key_start, &key_size, sizeof(size_t)); |
310 | key_start += sizeof(size_t); |
311 | memcpy(key_start, keys[j].data, keys[j].size); |
312 | key_start += keys[j].size; |
313 | *key_start = '\0'; |
314 | ++key_start; |
315 | keys[j].data = start; |
316 | keys[j].size += sizeof(size_t) + 1; |
317 | } |
318 | else |
319 | { |
320 | memcpy(key_start, keys[j].data, keys[j].size); |
321 | keys[j].data = key_start; |
322 | key_start += keys[j].size; |
323 | } |
324 | } |
325 | |
326 | return {place, sum_keys_size}; |
327 | } |
328 | |
329 | /// Explicit instantiations. |
330 | |
331 | template StringRef ComplexKeyCacheDictionary::placeKeysInPool<Arena>( |
332 | const size_t row, |
333 | const Columns & key_columns, |
334 | StringRefs & keys, |
335 | const std::vector<DictionaryAttribute> & key_attributes, |
336 | Arena & pool); |
337 | |
338 | template StringRef ComplexKeyCacheDictionary::placeKeysInPool<ArenaWithFreeLists>( |
339 | const size_t row, |
340 | const Columns & key_columns, |
341 | StringRefs & keys, |
342 | const std::vector<DictionaryAttribute> & key_attributes, |
343 | ArenaWithFreeLists & pool); |
344 | |
345 | |
346 | StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool(const size_t row, const Columns & key_columns) const |
347 | { |
348 | const auto res = fixed_size_keys_pool->alloc(); |
349 | auto place = res; |
350 | |
351 | for (const auto & key_column : key_columns) |
352 | { |
353 | const StringRef key = key_column->getDataAt(row); |
354 | memcpy(place, key.data, key.size); |
355 | place += key.size; |
356 | } |
357 | |
358 | return {res, key_size}; |
359 | } |
360 | |
361 | StringRef ComplexKeyCacheDictionary::copyIntoArena(StringRef src, Arena & arena) |
362 | { |
363 | char * allocated = arena.alloc(src.size); |
364 | memcpy(allocated, src.data, src.size); |
365 | return {allocated, src.size}; |
366 | } |
367 | |
368 | StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const |
369 | { |
370 | const auto res = key_size_is_fixed ? fixed_size_keys_pool->alloc() : keys_pool->alloc(key.size); |
371 | memcpy(res, key.data, key.size); |
372 | |
373 | return {res, key.size}; |
374 | } |
375 | |
376 | bool ComplexKeyCacheDictionary::isEmptyCell(const UInt64 idx) const |
377 | { |
378 | return ( |
379 | cells[idx].key == StringRef{} |
380 | && (idx != zero_cell_idx || cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()))); |
381 | } |
382 | |
383 | BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const |
384 | { |
385 | std::vector<StringRef> keys; |
386 | { |
387 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
388 | |
389 | for (auto idx : ext::range(0, cells.size())) |
390 | if (!isEmptyCell(idx) && !cells[idx].isDefault()) |
391 | keys.push_back(cells[idx].key); |
392 | } |
393 | |
394 | using BlockInputStreamType = DictionaryBlockInputStream<ComplexKeyCacheDictionary, UInt64>; |
395 | return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, keys, column_names); |
396 | } |
397 | |
398 | void registerDictionaryComplexKeyCache(DictionaryFactory & factory) |
399 | { |
400 | auto create_layout = [=](const std::string & full_name, |
401 | const DictionaryStructure & dict_struct, |
402 | const Poco::Util::AbstractConfiguration & config, |
403 | const std::string & config_prefix, |
404 | DictionarySourcePtr source_ptr) -> DictionaryPtr |
405 | { |
406 | if (!dict_struct.key) |
407 | throw Exception{"'key' is required for dictionary of layout 'complex_key_hashed'" , ErrorCodes::BAD_ARGUMENTS}; |
408 | const auto & layout_prefix = config_prefix + ".layout" ; |
409 | const auto size = config.getInt(layout_prefix + ".complex_key_cache.size_in_cells" ); |
410 | if (size == 0) |
411 | throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells" , ErrorCodes::TOO_SMALL_BUFFER_SIZE}; |
412 | |
413 | const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty" , false); |
414 | if (require_nonempty) |
415 | throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set" , |
416 | ErrorCodes::BAD_ARGUMENTS}; |
417 | |
418 | const String database = config.getString(config_prefix + ".database" , "" ); |
419 | const String name = config.getString(config_prefix + ".name" ); |
420 | const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime" }; |
421 | return std::make_unique<ComplexKeyCacheDictionary>(database, name, dict_struct, std::move(source_ptr), dict_lifetime, size); |
422 | }; |
423 | factory.registerLayout("complex_key_cache" , create_layout, true); |
424 | } |
425 | |
426 | |
427 | } |
428 | |