1#include "CacheDictionary.h"
2
3#include <Columns/ColumnsNumber.h>
4#include <Common/ProfilingScopedRWLock.h>
5#include <Common/typeid_cast.h>
6#include <DataStreams/IBlockInputStream.h>
7#include <ext/chrono_io.h>
8#include <ext/map.h>
9#include <ext/range.h>
10#include <ext/size.h>
11
12namespace ProfileEvents
13{
14extern const Event DictCacheKeysRequested;
15extern const Event DictCacheKeysRequestedMiss;
16extern const Event DictCacheKeysRequestedFound;
17extern const Event DictCacheKeysExpired;
18extern const Event DictCacheKeysNotFound;
19extern const Event DictCacheKeysHit;
20extern const Event DictCacheRequestTimeNs;
21extern const Event DictCacheRequests;
22extern const Event DictCacheLockWriteNs;
23extern const Event DictCacheLockReadNs;
24}
25
26namespace CurrentMetrics
27{
28extern const Metric DictCacheRequests;
29}
30
31namespace DB
32{
33namespace ErrorCodes
34{
35 extern const int TYPE_MISMATCH;
36}
37
38template <typename AttributeType, typename OutputType, typename DefaultGetter>
39void CacheDictionary::getItemsNumberImpl(
40 Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
41{
42 /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
43 std::unordered_map<Key, std::vector<size_t>> outdated_ids;
44 auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays);
45 const auto rows = ext::size(ids);
46
47 size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
48
49 {
50 const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
51
52 const auto now = std::chrono::system_clock::now();
53 /// fetch up-to-date values, decide which ones require update
54 for (const auto row : ext::range(0, rows))
55 {
56 const auto id = ids[row];
57
58 /** cell should be updated if either:
59 * 1. ids do not match,
60 * 2. cell has expired,
61 * 3. explicit defaults were specified and cell was set default. */
62
63 const auto find_result = findCellIdx(id, now);
64 if (!find_result.valid)
65 {
66 outdated_ids[id].push_back(row);
67 if (find_result.outdated)
68 ++cache_expired;
69 else
70 ++cache_not_found;
71 }
72 else
73 {
74 ++cache_hit;
75 const auto & cell_idx = find_result.cell_idx;
76 const auto & cell = cells[cell_idx];
77 out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]);
78 }
79 }
80 }
81
82 ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
83 ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
84 ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
85
86 query_count.fetch_add(rows, std::memory_order_relaxed);
87 hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
88
89 if (outdated_ids.empty())
90 return;
91
92 std::vector<Key> required_ids(outdated_ids.size());
93 std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
94
95 /// request new values
96 update(
97 required_ids,
98 [&](const auto id, const auto cell_idx)
99 {
100 const auto attribute_value = attribute_array[cell_idx];
101
102 for (const size_t row : outdated_ids[id])
103 out[row] = static_cast<OutputType>(attribute_value);
104 },
105 [&](const auto id, const auto)
106 {
107 for (const size_t row : outdated_ids[id])
108 out[row] = get_default(row);
109 });
110}
111
112template <typename DefaultGetter>
113void CacheDictionary::getItemsString(
114 Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const
115{
116 const auto rows = ext::size(ids);
117
118 /// save on some allocations
119 out->getOffsets().reserve(rows);
120
121 auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays);
122
123 auto found_outdated_values = false;
124
125 /// perform optimistic version, fallback to pessimistic if failed
126 {
127 const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
128
129 const auto now = std::chrono::system_clock::now();
130 /// fetch up-to-date values, discard on fail
131 for (const auto row : ext::range(0, rows))
132 {
133 const auto id = ids[row];
134
135 const auto find_result = findCellIdx(id, now);
136 if (!find_result.valid)
137 {
138 found_outdated_values = true;
139 break;
140 }
141 else
142 {
143 const auto & cell_idx = find_result.cell_idx;
144 const auto & cell = cells[cell_idx];
145 const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
146 out->insertData(string_ref.data, string_ref.size);
147 }
148 }
149 }
150
151 /// optimistic code completed successfully
152 if (!found_outdated_values)
153 {
154 query_count.fetch_add(rows, std::memory_order_relaxed);
155 hit_count.fetch_add(rows, std::memory_order_release);
156 return;
157 }
158
159 /// now onto the pessimistic one, discard possible partial results from the optimistic path
160 out->getChars().resize_assume_reserved(0);
161 out->getOffsets().resize_assume_reserved(0);
162
163 /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
164 std::unordered_map<Key, std::vector<size_t>> outdated_ids;
165 /// we are going to store every string separately
166 std::unordered_map<Key, String> map;
167
168 size_t total_length = 0;
169 size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
170 {
171 const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
172
173 const auto now = std::chrono::system_clock::now();
174 for (const auto row : ext::range(0, ids.size()))
175 {
176 const auto id = ids[row];
177
178 const auto find_result = findCellIdx(id, now);
179 if (!find_result.valid)
180 {
181 outdated_ids[id].push_back(row);
182 if (find_result.outdated)
183 ++cache_expired;
184 else
185 ++cache_not_found;
186 }
187 else
188 {
189 ++cache_hit;
190 const auto & cell_idx = find_result.cell_idx;
191 const auto & cell = cells[cell_idx];
192 const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
193
194 if (!cell.isDefault())
195 map[id] = String{string_ref};
196
197 total_length += string_ref.size + 1;
198 }
199 }
200 }
201
202 ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
203 ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
204 ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
205
206 query_count.fetch_add(rows, std::memory_order_relaxed);
207 hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
208
209 /// request new values
210 if (!outdated_ids.empty())
211 {
212 std::vector<Key> required_ids(outdated_ids.size());
213 std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
214
215 update(
216 required_ids,
217 [&](const auto id, const auto cell_idx)
218 {
219 const auto attribute_value = attribute_array[cell_idx];
220
221 map[id] = String{attribute_value};
222 total_length += (attribute_value.size + 1) * outdated_ids[id].size();
223 },
224 [&](const auto id, const auto)
225 {
226 for (const auto row : outdated_ids[id])
227 total_length += get_default(row).size + 1;
228 });
229 }
230
231 out->getChars().reserve(total_length);
232
233 for (const auto row : ext::range(0, ext::size(ids)))
234 {
235 const auto id = ids[row];
236 const auto it = map.find(id);
237
238 const auto string_ref = it != std::end(map) ? StringRef{it->second} : get_default(row);
239 out->insertData(string_ref.data, string_ref.size);
240 }
241}
242
243template <typename PresentIdHandler, typename AbsentIdHandler>
244void CacheDictionary::update(
245 const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
246{
247 CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
248 ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
249
250 std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
251 for (const auto id : requested_ids)
252 remaining_ids.insert({id, 0});
253
254 const auto now = std::chrono::system_clock::now();
255
256 const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
257
258 if (now > backoff_end_time)
259 {
260 try
261 {
262 if (error_count)
263 {
264 /// Recover after error: we have to clone the source here because
265 /// it could keep connections which should be reset after error.
266 source_ptr = source_ptr->clone();
267 }
268
269 Stopwatch watch;
270 auto stream = source_ptr->loadIds(requested_ids);
271 stream->readPrefix();
272
273 while (const auto block = stream->read())
274 {
275 const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
276 if (!id_column)
277 throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
278
279 const auto & ids = id_column->getData();
280
281 /// cache column pointers
282 const auto column_ptrs = ext::map<std::vector>(
283 ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
284
285 for (const auto i : ext::range(0, ids.size()))
286 {
287 const auto id = ids[i];
288
289 const auto find_result = findCellIdx(id, now);
290 const auto & cell_idx = find_result.cell_idx;
291
292 auto & cell = cells[cell_idx];
293
294 for (const auto attribute_idx : ext::range(0, attributes.size()))
295 {
296 const auto & attribute_column = *column_ptrs[attribute_idx];
297 auto & attribute = attributes[attribute_idx];
298
299 setAttributeValue(attribute, cell_idx, attribute_column[i]);
300 }
301
302 /// if cell id is zero and zero does not map to this cell, then the cell is unused
303 if (cell.id == 0 && cell_idx != zero_cell_idx)
304 element_count.fetch_add(1, std::memory_order_relaxed);
305
306 cell.id = id;
307 if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
308 {
309 std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
310 cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
311 }
312 else
313 cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
314
315 /// inform caller
316 on_cell_updated(id, cell_idx);
317 /// mark corresponding id as found
318 remaining_ids[id] = 1;
319 }
320 }
321
322 stream->readSuffix();
323
324 error_count = 0;
325 last_exception = std::exception_ptr{};
326 backoff_end_time = std::chrono::system_clock::time_point{};
327
328 ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
329 }
330 catch (...)
331 {
332 ++error_count;
333 last_exception = std::current_exception();
334 backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
335
336 tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() +
337 "', next update is scheduled at " + ext::to_string(backoff_end_time));
338 }
339 }
340
341 size_t not_found_num = 0, found_num = 0;
342
343 /// Check which ids have not been found and require setting null_value
344 for (const auto & id_found_pair : remaining_ids)
345 {
346 if (id_found_pair.second)
347 {
348 ++found_num;
349 continue;
350 }
351 ++not_found_num;
352
353 const auto id = id_found_pair.first;
354
355 const auto find_result = findCellIdx(id, now);
356 const auto & cell_idx = find_result.cell_idx;
357 auto & cell = cells[cell_idx];
358
359 if (error_count)
360 {
361 if (find_result.outdated)
362 {
363 /// We have expired data for that `id` so we can continue using it.
364 bool was_default = cell.isDefault();
365 cell.setExpiresAt(backoff_end_time);
366 if (was_default)
367 cell.setDefault();
368 if (was_default)
369 on_id_not_found(id, cell_idx);
370 else
371 on_cell_updated(id, cell_idx);
372 continue;
373 }
374 /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
375 std::rethrow_exception(last_exception);
376 }
377
378 /// Check if cell had not been occupied before and increment element counter if it hadn't
379 if (cell.id == 0 && cell_idx != zero_cell_idx)
380 element_count.fetch_add(1, std::memory_order_relaxed);
381
382 cell.id = id;
383
384 if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
385 {
386 std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
387 cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
388 }
389 else
390 cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
391
392 /// Set null_value for each attribute
393 cell.setDefault();
394 for (auto & attribute : attributes)
395 setDefaultAttributeValue(attribute, cell_idx);
396
397 /// inform caller that the cell has not been found
398 on_id_not_found(id, cell_idx);
399 }
400
401 ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
402 ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
403 ProfileEvents::increment(ProfileEvents::DictCacheRequests);
404}
405
406}
407