1 | #include "CacheDictionary.h" |
2 | |
3 | #include <Columns/ColumnsNumber.h> |
4 | #include <Common/ProfilingScopedRWLock.h> |
5 | #include <Common/typeid_cast.h> |
6 | #include <DataStreams/IBlockInputStream.h> |
7 | #include <ext/chrono_io.h> |
8 | #include <ext/map.h> |
9 | #include <ext/range.h> |
10 | #include <ext/size.h> |
11 | |
12 | namespace ProfileEvents |
13 | { |
14 | extern const Event DictCacheKeysRequested; |
15 | extern const Event DictCacheKeysRequestedMiss; |
16 | extern const Event DictCacheKeysRequestedFound; |
17 | extern const Event DictCacheKeysExpired; |
18 | extern const Event DictCacheKeysNotFound; |
19 | extern const Event DictCacheKeysHit; |
20 | extern const Event DictCacheRequestTimeNs; |
21 | extern const Event DictCacheRequests; |
22 | extern const Event DictCacheLockWriteNs; |
23 | extern const Event DictCacheLockReadNs; |
24 | } |
25 | |
26 | namespace CurrentMetrics |
27 | { |
28 | extern const Metric DictCacheRequests; |
29 | } |
30 | |
31 | namespace DB |
32 | { |
33 | namespace ErrorCodes |
34 | { |
35 | extern const int TYPE_MISMATCH; |
36 | } |
37 | |
38 | template <typename AttributeType, typename OutputType, typename DefaultGetter> |
39 | void CacheDictionary::getItemsNumberImpl( |
40 | Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const |
41 | { |
42 | /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> } |
43 | std::unordered_map<Key, std::vector<size_t>> outdated_ids; |
44 | auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays); |
45 | const auto rows = ext::size(ids); |
46 | |
47 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
48 | |
49 | { |
50 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
51 | |
52 | const auto now = std::chrono::system_clock::now(); |
53 | /// fetch up-to-date values, decide which ones require update |
54 | for (const auto row : ext::range(0, rows)) |
55 | { |
56 | const auto id = ids[row]; |
57 | |
58 | /** cell should be updated if either: |
59 | * 1. ids do not match, |
60 | * 2. cell has expired, |
61 | * 3. explicit defaults were specified and cell was set default. */ |
62 | |
63 | const auto find_result = findCellIdx(id, now); |
64 | if (!find_result.valid) |
65 | { |
66 | outdated_ids[id].push_back(row); |
67 | if (find_result.outdated) |
68 | ++cache_expired; |
69 | else |
70 | ++cache_not_found; |
71 | } |
72 | else |
73 | { |
74 | ++cache_hit; |
75 | const auto & cell_idx = find_result.cell_idx; |
76 | const auto & cell = cells[cell_idx]; |
77 | out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]); |
78 | } |
79 | } |
80 | } |
81 | |
82 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
83 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
84 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
85 | |
86 | query_count.fetch_add(rows, std::memory_order_relaxed); |
87 | hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release); |
88 | |
89 | if (outdated_ids.empty()) |
90 | return; |
91 | |
92 | std::vector<Key> required_ids(outdated_ids.size()); |
93 | std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); |
94 | |
95 | /// request new values |
96 | update( |
97 | required_ids, |
98 | [&](const auto id, const auto cell_idx) |
99 | { |
100 | const auto attribute_value = attribute_array[cell_idx]; |
101 | |
102 | for (const size_t row : outdated_ids[id]) |
103 | out[row] = static_cast<OutputType>(attribute_value); |
104 | }, |
105 | [&](const auto id, const auto) |
106 | { |
107 | for (const size_t row : outdated_ids[id]) |
108 | out[row] = get_default(row); |
109 | }); |
110 | } |
111 | |
112 | template <typename DefaultGetter> |
113 | void CacheDictionary::getItemsString( |
114 | Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const |
115 | { |
116 | const auto rows = ext::size(ids); |
117 | |
118 | /// save on some allocations |
119 | out->getOffsets().reserve(rows); |
120 | |
121 | auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays); |
122 | |
123 | auto found_outdated_values = false; |
124 | |
125 | /// perform optimistic version, fallback to pessimistic if failed |
126 | { |
127 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
128 | |
129 | const auto now = std::chrono::system_clock::now(); |
130 | /// fetch up-to-date values, discard on fail |
131 | for (const auto row : ext::range(0, rows)) |
132 | { |
133 | const auto id = ids[row]; |
134 | |
135 | const auto find_result = findCellIdx(id, now); |
136 | if (!find_result.valid) |
137 | { |
138 | found_outdated_values = true; |
139 | break; |
140 | } |
141 | else |
142 | { |
143 | const auto & cell_idx = find_result.cell_idx; |
144 | const auto & cell = cells[cell_idx]; |
145 | const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; |
146 | out->insertData(string_ref.data, string_ref.size); |
147 | } |
148 | } |
149 | } |
150 | |
151 | /// optimistic code completed successfully |
152 | if (!found_outdated_values) |
153 | { |
154 | query_count.fetch_add(rows, std::memory_order_relaxed); |
155 | hit_count.fetch_add(rows, std::memory_order_release); |
156 | return; |
157 | } |
158 | |
159 | /// now onto the pessimistic one, discard possible partial results from the optimistic path |
160 | out->getChars().resize_assume_reserved(0); |
161 | out->getOffsets().resize_assume_reserved(0); |
162 | |
163 | /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> } |
164 | std::unordered_map<Key, std::vector<size_t>> outdated_ids; |
165 | /// we are going to store every string separately |
166 | std::unordered_map<Key, String> map; |
167 | |
168 | size_t total_length = 0; |
169 | size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; |
170 | { |
171 | const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; |
172 | |
173 | const auto now = std::chrono::system_clock::now(); |
174 | for (const auto row : ext::range(0, ids.size())) |
175 | { |
176 | const auto id = ids[row]; |
177 | |
178 | const auto find_result = findCellIdx(id, now); |
179 | if (!find_result.valid) |
180 | { |
181 | outdated_ids[id].push_back(row); |
182 | if (find_result.outdated) |
183 | ++cache_expired; |
184 | else |
185 | ++cache_not_found; |
186 | } |
187 | else |
188 | { |
189 | ++cache_hit; |
190 | const auto & cell_idx = find_result.cell_idx; |
191 | const auto & cell = cells[cell_idx]; |
192 | const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; |
193 | |
194 | if (!cell.isDefault()) |
195 | map[id] = String{string_ref}; |
196 | |
197 | total_length += string_ref.size + 1; |
198 | } |
199 | } |
200 | } |
201 | |
202 | ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); |
203 | ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); |
204 | ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); |
205 | |
206 | query_count.fetch_add(rows, std::memory_order_relaxed); |
207 | hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release); |
208 | |
209 | /// request new values |
210 | if (!outdated_ids.empty()) |
211 | { |
212 | std::vector<Key> required_ids(outdated_ids.size()); |
213 | std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); |
214 | |
215 | update( |
216 | required_ids, |
217 | [&](const auto id, const auto cell_idx) |
218 | { |
219 | const auto attribute_value = attribute_array[cell_idx]; |
220 | |
221 | map[id] = String{attribute_value}; |
222 | total_length += (attribute_value.size + 1) * outdated_ids[id].size(); |
223 | }, |
224 | [&](const auto id, const auto) |
225 | { |
226 | for (const auto row : outdated_ids[id]) |
227 | total_length += get_default(row).size + 1; |
228 | }); |
229 | } |
230 | |
231 | out->getChars().reserve(total_length); |
232 | |
233 | for (const auto row : ext::range(0, ext::size(ids))) |
234 | { |
235 | const auto id = ids[row]; |
236 | const auto it = map.find(id); |
237 | |
238 | const auto string_ref = it != std::end(map) ? StringRef{it->second} : get_default(row); |
239 | out->insertData(string_ref.data, string_ref.size); |
240 | } |
241 | } |
242 | |
243 | template <typename PresentIdHandler, typename AbsentIdHandler> |
244 | void CacheDictionary::update( |
245 | const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const |
246 | { |
247 | CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; |
248 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); |
249 | |
250 | std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()}; |
251 | for (const auto id : requested_ids) |
252 | remaining_ids.insert({id, 0}); |
253 | |
254 | const auto now = std::chrono::system_clock::now(); |
255 | |
256 | const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; |
257 | |
258 | if (now > backoff_end_time) |
259 | { |
260 | try |
261 | { |
262 | if (error_count) |
263 | { |
264 | /// Recover after error: we have to clone the source here because |
265 | /// it could keep connections which should be reset after error. |
266 | source_ptr = source_ptr->clone(); |
267 | } |
268 | |
269 | Stopwatch watch; |
270 | auto stream = source_ptr->loadIds(requested_ids); |
271 | stream->readPrefix(); |
272 | |
273 | while (const auto block = stream->read()) |
274 | { |
275 | const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get()); |
276 | if (!id_column) |
277 | throw Exception{name + ": id column has type different from UInt64." , ErrorCodes::TYPE_MISMATCH}; |
278 | |
279 | const auto & ids = id_column->getData(); |
280 | |
281 | /// cache column pointers |
282 | const auto column_ptrs = ext::map<std::vector>( |
283 | ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); }); |
284 | |
285 | for (const auto i : ext::range(0, ids.size())) |
286 | { |
287 | const auto id = ids[i]; |
288 | |
289 | const auto find_result = findCellIdx(id, now); |
290 | const auto & cell_idx = find_result.cell_idx; |
291 | |
292 | auto & cell = cells[cell_idx]; |
293 | |
294 | for (const auto attribute_idx : ext::range(0, attributes.size())) |
295 | { |
296 | const auto & attribute_column = *column_ptrs[attribute_idx]; |
297 | auto & attribute = attributes[attribute_idx]; |
298 | |
299 | setAttributeValue(attribute, cell_idx, attribute_column[i]); |
300 | } |
301 | |
302 | /// if cell id is zero and zero does not map to this cell, then the cell is unused |
303 | if (cell.id == 0 && cell_idx != zero_cell_idx) |
304 | element_count.fetch_add(1, std::memory_order_relaxed); |
305 | |
306 | cell.id = id; |
307 | if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) |
308 | { |
309 | std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; |
310 | cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); |
311 | } |
312 | else |
313 | cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); |
314 | |
315 | /// inform caller |
316 | on_cell_updated(id, cell_idx); |
317 | /// mark corresponding id as found |
318 | remaining_ids[id] = 1; |
319 | } |
320 | } |
321 | |
322 | stream->readSuffix(); |
323 | |
324 | error_count = 0; |
325 | last_exception = std::exception_ptr{}; |
326 | backoff_end_time = std::chrono::system_clock::time_point{}; |
327 | |
328 | ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed()); |
329 | } |
330 | catch (...) |
331 | { |
332 | ++error_count; |
333 | last_exception = std::current_exception(); |
334 | backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count)); |
335 | |
336 | tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() + |
337 | "', next update is scheduled at " + ext::to_string(backoff_end_time)); |
338 | } |
339 | } |
340 | |
341 | size_t not_found_num = 0, found_num = 0; |
342 | |
343 | /// Check which ids have not been found and require setting null_value |
344 | for (const auto & id_found_pair : remaining_ids) |
345 | { |
346 | if (id_found_pair.second) |
347 | { |
348 | ++found_num; |
349 | continue; |
350 | } |
351 | ++not_found_num; |
352 | |
353 | const auto id = id_found_pair.first; |
354 | |
355 | const auto find_result = findCellIdx(id, now); |
356 | const auto & cell_idx = find_result.cell_idx; |
357 | auto & cell = cells[cell_idx]; |
358 | |
359 | if (error_count) |
360 | { |
361 | if (find_result.outdated) |
362 | { |
363 | /// We have expired data for that `id` so we can continue using it. |
364 | bool was_default = cell.isDefault(); |
365 | cell.setExpiresAt(backoff_end_time); |
366 | if (was_default) |
367 | cell.setDefault(); |
368 | if (was_default) |
369 | on_id_not_found(id, cell_idx); |
370 | else |
371 | on_cell_updated(id, cell_idx); |
372 | continue; |
373 | } |
374 | /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`. |
375 | std::rethrow_exception(last_exception); |
376 | } |
377 | |
378 | /// Check if cell had not been occupied before and increment element counter if it hadn't |
379 | if (cell.id == 0 && cell_idx != zero_cell_idx) |
380 | element_count.fetch_add(1, std::memory_order_relaxed); |
381 | |
382 | cell.id = id; |
383 | |
384 | if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) |
385 | { |
386 | std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; |
387 | cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); |
388 | } |
389 | else |
390 | cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); |
391 | |
392 | /// Set null_value for each attribute |
393 | cell.setDefault(); |
394 | for (auto & attribute : attributes) |
395 | setDefaultAttributeValue(attribute, cell_idx); |
396 | |
397 | /// inform caller that the cell has not been found |
398 | on_id_not_found(id, cell_idx); |
399 | } |
400 | |
401 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); |
402 | ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num); |
403 | ProfileEvents::increment(ProfileEvents::DictCacheRequests); |
404 | } |
405 | |
406 | } |
407 | |