| 1 | #include "ComplexKeyCacheDictionary.h" |
| 2 | #include <Common/Arena.h> |
| 3 | #include <Common/BitHelpers.h> |
| 4 | #include <Common/CurrentMetrics.h> |
| 5 | #include <Common/ProfileEvents.h> |
| 6 | #include <Common/ProfilingScopedRWLock.h> |
| 7 | #include <Common/Stopwatch.h> |
| 8 | #include <Common/randomSeed.h> |
| 9 | #include <ext/map.h> |
| 10 | #include <ext/range.h> |
| 11 | #include "DictionaryBlockInputStream.h" |
| 12 | #include "DictionaryFactory.h" |
| 13 | |
| 14 | |
| 15 | namespace ProfileEvents |
| 16 | { |
| 17 | extern const Event DictCacheKeysRequested; |
| 18 | extern const Event DictCacheKeysRequestedMiss; |
| 19 | extern const Event DictCacheKeysRequestedFound; |
| 20 | extern const Event DictCacheKeysExpired; |
| 21 | extern const Event DictCacheKeysNotFound; |
| 22 | extern const Event DictCacheKeysHit; |
| 23 | extern const Event DictCacheRequestTimeNs; |
| 24 | extern const Event DictCacheLockWriteNs; |
| 25 | extern const Event DictCacheLockReadNs; |
| 26 | } |
| 27 | |
| 28 | namespace CurrentMetrics |
| 29 | { |
| 30 | extern const Metric DictCacheRequests; |
| 31 | } |
| 32 | |
| 33 | |
| 34 | namespace DB |
| 35 | { |
| 36 | namespace ErrorCodes |
| 37 | { |
| 38 | extern const int TYPE_MISMATCH; |
| 39 | extern const int BAD_ARGUMENTS; |
| 40 | extern const int UNSUPPORTED_METHOD; |
| 41 | extern const int TOO_SMALL_BUFFER_SIZE; |
| 42 | } |
| 43 | |
| 44 | |
| 45 | inline UInt64 ComplexKeyCacheDictionary::getCellIdx(const StringRef key) const |
| 46 | { |
| 47 | const auto hash = StringRefHash{}(key); |
| 48 | const auto idx = hash & size_overlap_mask; |
| 49 | return idx; |
| 50 | } |
| 51 | |
| 52 | |
| 53 | ComplexKeyCacheDictionary::ComplexKeyCacheDictionary( |
| 54 | const std::string & database_, |
| 55 | const std::string & name_, |
| 56 | const DictionaryStructure & dict_struct_, |
| 57 | DictionarySourcePtr source_ptr_, |
| 58 | const DictionaryLifetime dict_lifetime_, |
| 59 | const size_t size_) |
| 60 | : database(database_) |
| 61 | , name(name_) |
| 62 | , full_name{database_.empty() ? name_ : (database_ + "." + name_)} |
| 63 | , dict_struct(dict_struct_) |
| 64 | , source_ptr{std::move(source_ptr_)} |
| 65 | , dict_lifetime(dict_lifetime_) |
| 66 | , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} |
| 67 | , size_overlap_mask{this->size - 1} |
| 68 | , rnd_engine(randomSeed()) |
| 69 | { |
| 70 | if (!this->source_ptr->supportsSelectiveLoad()) |
| 71 | throw Exception{full_name + ": source cannot be used with ComplexKeyCacheDictionary" , ErrorCodes::UNSUPPORTED_METHOD}; |
| 72 | |
| 73 | createAttributes(); |
| 74 | } |
| 75 | |
| 76 | |
| 77 | void ComplexKeyCacheDictionary::getString( |
| 78 | const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const |
| 79 | { |
| 80 | dict_struct.validateKeyTypes(key_types); |
| 81 | |
| 82 | auto & attribute = getAttribute(attribute_name); |
| 83 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
| 84 | |
| 85 | const auto null_value = StringRef{std::get<String>(attribute.null_values)}; |
| 86 | |
| 87 | getItemsString(attribute, key_columns, out, [&](const size_t) { return null_value; }); |
| 88 | } |
| 89 | |
| 90 | void ComplexKeyCacheDictionary::getString( |
| 91 | const std::string & attribute_name, |
| 92 | const Columns & key_columns, |
| 93 | const DataTypes & key_types, |
| 94 | const ColumnString * const def, |
| 95 | ColumnString * const out) const |
| 96 | { |
| 97 | dict_struct.validateKeyTypes(key_types); |
| 98 | |
| 99 | auto & attribute = getAttribute(attribute_name); |
| 100 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
| 101 | |
| 102 | getItemsString(attribute, key_columns, out, [&](const size_t row) { return def->getDataAt(row); }); |
| 103 | } |
| 104 | |
| 105 | void ComplexKeyCacheDictionary::getString( |
| 106 | const std::string & attribute_name, |
| 107 | const Columns & key_columns, |
| 108 | const DataTypes & key_types, |
| 109 | const String & def, |
| 110 | ColumnString * const out) const |
| 111 | { |
| 112 | dict_struct.validateKeyTypes(key_types); |
| 113 | |
| 114 | auto & attribute = getAttribute(attribute_name); |
| 115 | checkAttributeType(full_name, attribute_name, attribute.type, AttributeUnderlyingType::utString); |
| 116 | |
| 117 | getItemsString(attribute, key_columns, out, [&](const size_t) { return StringRef{def}; }); |
| 118 | } |
| 119 | |
| 120 | /// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag, |
| 121 | /// true false found and valid |
| 122 | /// false true not found (something outdated, maybe our cell) |
| 123 | /// false false not found (other id stored with valid data) |
| 124 | /// true true impossible |
| 125 | /// |
| 126 | /// todo: split this func to two: find_for_get and find_for_set |
| 127 | ComplexKeyCacheDictionary::FindResult |
| 128 | ComplexKeyCacheDictionary::findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const |
| 129 | { |
| 130 | auto pos = hash; |
| 131 | auto oldest_id = pos; |
| 132 | auto oldest_time = CellMetadata::time_point_t::max(); |
| 133 | const auto stop = pos + max_collision_length; |
| 134 | |
| 135 | for (; pos < stop; ++pos) |
| 136 | { |
| 137 | const auto cell_idx = pos & size_overlap_mask; |
| 138 | const auto & cell = cells[cell_idx]; |
| 139 | |
| 140 | if (cell.hash != hash || cell.key != key) |
| 141 | { |
| 142 | /// maybe we already found nearest expired cell |
| 143 | if (oldest_time > now && oldest_time > cell.expiresAt()) |
| 144 | { |
| 145 | oldest_time = cell.expiresAt(); |
| 146 | oldest_id = cell_idx; |
| 147 | } |
| 148 | |
| 149 | continue; |
| 150 | } |
| 151 | |
| 152 | if (cell.expiresAt() < now) |
| 153 | { |
| 154 | return {cell_idx, false, true}; |
| 155 | } |
| 156 | |
| 157 | return {cell_idx, true, false}; |
| 158 | } |
| 159 | |
| 160 | oldest_id &= size_overlap_mask; |
| 161 | return {oldest_id, false, false}; |
| 162 | } |
| 163 | |
| 164 | void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const |
| 165 | { |
| 166 | dict_struct.validateKeyTypes(key_types); |
| 167 | |
| 168 | /// Mapping: <key> -> { all indices `i` of `key_columns` such that `key_columns[i]` = <key> } |
| 169 | MapType<std::vector<size_t>> outdated_keys; |
| 170 | |
| 171 | |
| 172 | const auto rows_num = key_columns.front()->size(); |
| 173 | const auto keys_size = dict_struct.key->size(); |
| 174 | StringRefs keys(keys_size); |
| 175 | Arena temporary_keys_pool; |
| 176 | PODArray<StringRef> keys_array(rows_num); |
| 177 | |
| 178 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
| 179 | { |
| 180 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
| 181 | |
| 182 | const auto now = std::chrono::system_clock::now(); |
| 183 | /// fetch up-to-date values, decide which ones require update |
| 184 | for (const auto row : ext::range(0, rows_num)) |
| 185 | { |
| 186 | const StringRef key = placeKeysInPool(row, key_columns, keys, *dict_struct.key, temporary_keys_pool); |
| 187 | keys_array[row] = key; |
| 188 | const auto find_result = findCellIdx(key, now); |
| 189 | const auto & cell_idx = find_result.cell_idx; |
| 190 | /** cell should be updated if either: |
| 191 | * 1. keys (or hash) do not match, |
| 192 | * 2. cell has expired, |
| 193 | * 3. explicit defaults were specified and cell was set default. */ |
| 194 | if (!find_result.valid) |
| 195 | { |
| 196 | outdated_keys[key].push_back(row); |
| 197 | if (find_result.outdated) |
| 198 | ++cache_expired; |
| 199 | else |
| 200 | ++cache_not_found; |
| 201 | } |
| 202 | else |
| 203 | { |
| 204 | ++cache_hit; |
| 205 | const auto & cell = cells[cell_idx]; |
| 206 | out[row] = !cell.isDefault(); |
| 207 | } |
| 208 | } |
| 209 | } |
| 210 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
| 211 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
| 212 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
| 213 | |
| 214 | query_count.fetch_add(rows_num, std::memory_order_relaxed); |
| 215 | hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); |
| 216 | |
| 217 | if (outdated_keys.empty()) |
| 218 | return; |
| 219 | |
| 220 | std::vector<size_t> required_rows(outdated_keys.size()); |
| 221 | std::transform( |
| 222 | std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) { return pair.getMapped().front(); }); |
| 223 | |
| 224 | /// request new values |
| 225 | update( |
| 226 | key_columns, |
| 227 | keys_array, |
| 228 | required_rows, |
| 229 | [&](const StringRef key, const auto) |
| 230 | { |
| 231 | for (const auto out_idx : outdated_keys[key]) |
| 232 | out[out_idx] = true; |
| 233 | }, |
| 234 | [&](const StringRef key, const auto) |
| 235 | { |
| 236 | for (const auto out_idx : outdated_keys[key]) |
| 237 | out[out_idx] = false; |
| 238 | }); |
| 239 | } |
| 240 | |
| 241 | void ComplexKeyCacheDictionary::createAttributes() |
| 242 | { |
| 243 | const auto attributes_size = dict_struct.attributes.size(); |
| 244 | attributes.reserve(attributes_size); |
| 245 | |
| 246 | bytes_allocated += size * sizeof(CellMetadata); |
| 247 | bytes_allocated += attributes_size * sizeof(attributes.front()); |
| 248 | |
| 249 | for (const auto & attribute : dict_struct.attributes) |
| 250 | { |
| 251 | attribute_index_by_name.emplace(attribute.name, attributes.size()); |
| 252 | attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value)); |
| 253 | |
| 254 | if (attribute.hierarchical) |
| 255 | throw Exception{full_name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(), |
| 256 | ErrorCodes::TYPE_MISMATCH}; |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | ComplexKeyCacheDictionary::Attribute & ComplexKeyCacheDictionary::getAttribute(const std::string & attribute_name) const |
| 261 | { |
| 262 | const auto it = attribute_index_by_name.find(attribute_name); |
| 263 | if (it == std::end(attribute_index_by_name)) |
| 264 | throw Exception{full_name + ": no such attribute '" + attribute_name + "'" , ErrorCodes::BAD_ARGUMENTS}; |
| 265 | |
| 266 | return attributes[it->second]; |
| 267 | } |
| 268 | |
| 269 | StringRef ComplexKeyCacheDictionary::allocKey(const size_t row, const Columns & key_columns, StringRefs & keys) const |
| 270 | { |
| 271 | if (key_size_is_fixed) |
| 272 | return placeKeysInFixedSizePool(row, key_columns); |
| 273 | |
| 274 | return placeKeysInPool(row, key_columns, keys, *dict_struct.key, *keys_pool); |
| 275 | } |
| 276 | |
| 277 | void ComplexKeyCacheDictionary::freeKey(const StringRef key) const |
| 278 | { |
| 279 | if (key_size_is_fixed) |
| 280 | fixed_size_keys_pool->free(const_cast<char *>(key.data)); |
| 281 | else |
| 282 | keys_pool->free(const_cast<char *>(key.data), key.size); |
| 283 | } |
| 284 | |
| 285 | template <typename Pool> |
| 286 | StringRef ComplexKeyCacheDictionary::placeKeysInPool( |
| 287 | const size_t row, const Columns & key_columns, StringRefs & keys, const std::vector<DictionaryAttribute> & key_attributes, Pool & pool) |
| 288 | { |
| 289 | const auto keys_size = key_columns.size(); |
| 290 | size_t sum_keys_size{}; |
| 291 | |
| 292 | for (size_t j = 0; j < keys_size; ++j) |
| 293 | { |
| 294 | keys[j] = key_columns[j]->getDataAt(row); |
| 295 | sum_keys_size += keys[j].size; |
| 296 | if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString) |
| 297 | sum_keys_size += sizeof(size_t) + 1; |
| 298 | } |
| 299 | |
| 300 | auto place = pool.alloc(sum_keys_size); |
| 301 | |
| 302 | auto key_start = place; |
| 303 | for (size_t j = 0; j < keys_size; ++j) |
| 304 | { |
| 305 | if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString) |
| 306 | { |
| 307 | auto start = key_start; |
| 308 | auto key_size = keys[j].size + 1; |
| 309 | memcpy(key_start, &key_size, sizeof(size_t)); |
| 310 | key_start += sizeof(size_t); |
| 311 | memcpy(key_start, keys[j].data, keys[j].size); |
| 312 | key_start += keys[j].size; |
| 313 | *key_start = '\0'; |
| 314 | ++key_start; |
| 315 | keys[j].data = start; |
| 316 | keys[j].size += sizeof(size_t) + 1; |
| 317 | } |
| 318 | else |
| 319 | { |
| 320 | memcpy(key_start, keys[j].data, keys[j].size); |
| 321 | keys[j].data = key_start; |
| 322 | key_start += keys[j].size; |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | return {place, sum_keys_size}; |
| 327 | } |
| 328 | |
| 329 | /// Explicit instantiations. |
| 330 | |
| 331 | template StringRef ComplexKeyCacheDictionary::placeKeysInPool<Arena>( |
| 332 | const size_t row, |
| 333 | const Columns & key_columns, |
| 334 | StringRefs & keys, |
| 335 | const std::vector<DictionaryAttribute> & key_attributes, |
| 336 | Arena & pool); |
| 337 | |
| 338 | template StringRef ComplexKeyCacheDictionary::placeKeysInPool<ArenaWithFreeLists>( |
| 339 | const size_t row, |
| 340 | const Columns & key_columns, |
| 341 | StringRefs & keys, |
| 342 | const std::vector<DictionaryAttribute> & key_attributes, |
| 343 | ArenaWithFreeLists & pool); |
| 344 | |
| 345 | |
| 346 | StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool(const size_t row, const Columns & key_columns) const |
| 347 | { |
| 348 | const auto res = fixed_size_keys_pool->alloc(); |
| 349 | auto place = res; |
| 350 | |
| 351 | for (const auto & key_column : key_columns) |
| 352 | { |
| 353 | const StringRef key = key_column->getDataAt(row); |
| 354 | memcpy(place, key.data, key.size); |
| 355 | place += key.size; |
| 356 | } |
| 357 | |
| 358 | return {res, key_size}; |
| 359 | } |
| 360 | |
| 361 | StringRef ComplexKeyCacheDictionary::copyIntoArena(StringRef src, Arena & arena) |
| 362 | { |
| 363 | char * allocated = arena.alloc(src.size); |
| 364 | memcpy(allocated, src.data, src.size); |
| 365 | return {allocated, src.size}; |
| 366 | } |
| 367 | |
| 368 | StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const |
| 369 | { |
| 370 | const auto res = key_size_is_fixed ? fixed_size_keys_pool->alloc() : keys_pool->alloc(key.size); |
| 371 | memcpy(res, key.data, key.size); |
| 372 | |
| 373 | return {res, key.size}; |
| 374 | } |
| 375 | |
| 376 | bool ComplexKeyCacheDictionary::isEmptyCell(const UInt64 idx) const |
| 377 | { |
| 378 | return ( |
| 379 | cells[idx].key == StringRef{} |
| 380 | && (idx != zero_cell_idx || cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t()))); |
| 381 | } |
| 382 | |
| 383 | BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const |
| 384 | { |
| 385 | std::vector<StringRef> keys; |
| 386 | { |
| 387 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
| 388 | |
| 389 | for (auto idx : ext::range(0, cells.size())) |
| 390 | if (!isEmptyCell(idx) && !cells[idx].isDefault()) |
| 391 | keys.push_back(cells[idx].key); |
| 392 | } |
| 393 | |
| 394 | using BlockInputStreamType = DictionaryBlockInputStream<ComplexKeyCacheDictionary, UInt64>; |
| 395 | return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, keys, column_names); |
| 396 | } |
| 397 | |
| 398 | void registerDictionaryComplexKeyCache(DictionaryFactory & factory) |
| 399 | { |
| 400 | auto create_layout = [=](const std::string & full_name, |
| 401 | const DictionaryStructure & dict_struct, |
| 402 | const Poco::Util::AbstractConfiguration & config, |
| 403 | const std::string & config_prefix, |
| 404 | DictionarySourcePtr source_ptr) -> DictionaryPtr |
| 405 | { |
| 406 | if (!dict_struct.key) |
| 407 | throw Exception{"'key' is required for dictionary of layout 'complex_key_hashed'" , ErrorCodes::BAD_ARGUMENTS}; |
| 408 | const auto & layout_prefix = config_prefix + ".layout" ; |
| 409 | const auto size = config.getInt(layout_prefix + ".complex_key_cache.size_in_cells" ); |
| 410 | if (size == 0) |
| 411 | throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells" , ErrorCodes::TOO_SMALL_BUFFER_SIZE}; |
| 412 | |
| 413 | const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty" , false); |
| 414 | if (require_nonempty) |
| 415 | throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set" , |
| 416 | ErrorCodes::BAD_ARGUMENTS}; |
| 417 | |
| 418 | const String database = config.getString(config_prefix + ".database" , "" ); |
| 419 | const String name = config.getString(config_prefix + ".name" ); |
| 420 | const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime" }; |
| 421 | return std::make_unique<ComplexKeyCacheDictionary>(database, name, dict_struct, std::move(source_ptr), dict_lifetime, size); |
| 422 | }; |
| 423 | factory.registerLayout("complex_key_cache" , create_layout, true); |
| 424 | } |
| 425 | |
| 426 | |
| 427 | } |
| 428 | |