1#pragma once
2
3#include <atomic>
4#include <mutex>
5#include <list>
6#include <memory>
7#include <random>
8#include <pcg_random.hpp>
9#include <unordered_map>
10#include <sys/mman.h>
11#include <boost/intrusive/list.hpp>
12#include <boost/intrusive/set.hpp>
13#include <boost/noncopyable.hpp>
14#include <ext/scope_guard.h>
15
16#include <Common/Exception.h>
17#include <Common/randomSeed.h>
18#include <Common/formatReadable.h>
19
20/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
21#ifndef MAP_ANONYMOUS
22#define MAP_ANONYMOUS MAP_ANON
23#endif
24
25
26namespace DB
27{
28 namespace ErrorCodes
29 {
30 extern const int CANNOT_ALLOCATE_MEMORY;
31 extern const int CANNOT_MUNMAP;
32 }
33}
34
35
36/** Cache for variable length memory regions (contiguous arrays of bytes).
37 * Example: cache for data read from disk, cache for decompressed data, etc.
38 * It combines cache and allocator: allocates memory by itself without use of malloc/new.
39 *
40 * Motivation:
41 * - cache has specified memory usage limit and we want this limit to include allocator fragmentation overhead;
42 * - the cache overcomes memory fragmentation by cache eviction;
43 * - there is no sense for reclaimed memory regions to be cached internally by usual allocator (malloc/new);
44 * - by allocating memory directly with mmap, we could place it in virtual address space far
45 * from other (malloc'd) memory - this helps debugging memory stomping bugs
46 * (your program will likely hit unmapped memory and get segfault rather than silent cache corruption)
47 *
48 * Implementation:
49 *
50 * Cache is represented by list of mmapped chunks.
51 * Each chunk holds free and occupied memory regions. Contiguous free regions are always coalesced.
52 *
53 * Each region could be linked by following metadata structures:
54 * 1. LRU list - to find next region for eviction. NOTE Replace with exponentially-smoothed size-weighted LFU map.
55 * 2. Adjacency list - to find neighbour free regions to coalesce on eviction.
56 * 3. Size multimap - to find free region with at least requested size.
57 * 4. Key map - to find element by key.
58 *
59 * Each region has:
60 * - size;
61 * - refcount: region could be evicted only if it is not used anywhere;
62 * - chunk address: to check if regions are from same chunk.
63 *
64 * During insertion, each key is locked - to avoid parallel initialization of regions for same key.
65 *
66 * On insertion, we search for free region of at least requested size.
67 * If nothing was found, we evict oldest unused region; if not enough size, we evict it neighbours; and try again.
68 *
69 * Metadata is allocated by usual allocator and its memory usage is not accounted.
70 *
71 * Caveats:
72 * - cache is not NUMA friendly.
73 *
74 * Performance: few million ops/sec from single thread, less in case of concurrency.
75 * Fragmentation is usually less than 10%.
76 */
77
78template <typename Key, typename Payload>
79class ArrayCache : private boost::noncopyable
80{
81private:
82 struct LRUListTag;
83 struct AdjacencyListTag;
84 struct SizeMultimapTag;
85 struct KeyMapTag;
86
87 using LRUListHook = boost::intrusive::list_base_hook<boost::intrusive::tag<LRUListTag>>;
88 using AdjacencyListHook = boost::intrusive::list_base_hook<boost::intrusive::tag<AdjacencyListTag>>;
89 using SizeMultimapHook = boost::intrusive::set_base_hook<boost::intrusive::tag<SizeMultimapTag>>;
90 using KeyMapHook = boost::intrusive::set_base_hook<boost::intrusive::tag<KeyMapTag>>;
91
92 struct RegionMetadata : public LRUListHook, AdjacencyListHook, SizeMultimapHook, KeyMapHook
93 {
94 Key key;
95 Payload payload;
96
97 union
98 {
99 void * ptr;
100 char * char_ptr;
101 };
102 size_t size;
103 size_t refcount = 0;
104 void * chunk;
105
106 bool operator< (const RegionMetadata & other) const { return size < other.size; }
107
108 bool isFree() const { return SizeMultimapHook::is_linked(); }
109
110 static RegionMetadata * create()
111 {
112 return new RegionMetadata;
113 }
114
115 void destroy()
116 {
117 delete this;
118 }
119
120 private:
121 RegionMetadata() = default;
122 ~RegionMetadata() = default;
123 };
124
125 struct RegionCompareBySize
126 {
127 bool operator() (const RegionMetadata & a, const RegionMetadata & b) const { return a.size < b.size; }
128 bool operator() (const RegionMetadata & a, size_t size) const { return a.size < size; }
129 bool operator() (size_t size, const RegionMetadata & b) const { return size < b.size; }
130 };
131
132 struct RegionCompareByKey
133 {
134 bool operator() (const RegionMetadata & a, const RegionMetadata & b) const { return a.key < b.key; }
135 bool operator() (const RegionMetadata & a, Key key) const { return a.key < key; }
136 bool operator() (Key key, const RegionMetadata & b) const { return key < b.key; }
137 };
138
139 using LRUList = boost::intrusive::list<RegionMetadata,
140 boost::intrusive::base_hook<LRUListHook>, boost::intrusive::constant_time_size<true>>;
141 using AdjacencyList = boost::intrusive::list<RegionMetadata,
142 boost::intrusive::base_hook<AdjacencyListHook>, boost::intrusive::constant_time_size<true>>;
143 using SizeMultimap = boost::intrusive::multiset<RegionMetadata,
144 boost::intrusive::compare<RegionCompareBySize>, boost::intrusive::base_hook<SizeMultimapHook>, boost::intrusive::constant_time_size<true>>;
145 using KeyMap = boost::intrusive::set<RegionMetadata,
146 boost::intrusive::compare<RegionCompareByKey>, boost::intrusive::base_hook<KeyMapHook>, boost::intrusive::constant_time_size<true>>;
147
148 /** Each region could be:
149 * - free: not holding any data;
150 * - allocated: having data, addressed by key;
151 * -- allocated, in use: holded externally, could not be evicted;
152 * -- allocated, not in use: not holded, could be evicted.
153 */
154
155 /** Invariants:
156 * adjacency_list contains all regions
157 * size_multimap contains free regions
158 * key_map contains allocated regions
159 * lru_list contains allocated regions, that are not in use
160 */
161
162 LRUList lru_list;
163 AdjacencyList adjacency_list;
164 SizeMultimap size_multimap;
165 KeyMap key_map;
166
167 mutable std::mutex mutex;
168
169 pcg64 rng{randomSeed()};
170
171 struct Chunk : private boost::noncopyable
172 {
173 void * ptr;
174 size_t size;
175
176 Chunk(size_t size_, void * address_hint) : size(size_)
177 {
178 ptr = mmap(address_hint, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
179 if (MAP_FAILED == ptr)
180 DB::throwFromErrno("Allocator: Cannot mmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
181 }
182
183 ~Chunk()
184 {
185 if (ptr && 0 != munmap(ptr, size))
186 DB::throwFromErrno("Allocator: Cannot munmap " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_MUNMAP);
187 }
188
189 Chunk(Chunk && other) : ptr(other.ptr), size(other.size)
190 {
191 other.ptr = nullptr;
192 }
193 };
194
195 using Chunks = std::list<Chunk>;
196 Chunks chunks;
197
198 size_t total_chunks_size = 0;
199 size_t total_allocated_size = 0;
200 std::atomic<size_t> total_size_currently_initialized {0};
201 size_t total_size_in_use = 0;
202
203 /// Max size of cache.
204 const size_t max_total_size;
205
206 /// We will allocate memory in chunks of at least that size.
207 /// 64 MB makes mmap overhead comparable to memory throughput.
208 static constexpr size_t min_chunk_size = 64 * 1024 * 1024;
209
210 /// Cache stats.
211 std::atomic<size_t> hits {0}; /// Value was in cache.
212 std::atomic<size_t> concurrent_hits {0}; /// Value was calculated by another thread and we was waiting for it. Also summed in hits.
213 std::atomic<size_t> misses {0};
214
215 /// For whole lifetime.
216 size_t allocations = 0;
217 size_t allocated_bytes = 0;
218 size_t evictions = 0;
219 size_t evicted_bytes = 0;
220 size_t secondary_evictions = 0;
221
222
223public:
224 /// Holds region as in use. Regions in use could not be evicted from cache.
225 /// In constructor, increases refcount and if it becomes non-zero, remove region from lru_list.
226 /// In destructor, decreases refcount and if it becomes zero, insert region to lru_list.
227 struct Holder : private boost::noncopyable
228 {
229 Holder(ArrayCache & cache_, RegionMetadata & region_) : cache(cache_), region(region_)
230 {
231 if (++region.refcount == 1 && region.LRUListHook::is_linked())
232 cache.lru_list.erase(cache.lru_list.iterator_to(region));
233 cache.total_size_in_use += region.size;
234 }
235
236 ~Holder()
237 {
238 std::lock_guard cache_lock(cache.mutex);
239 if (--region.refcount == 0)
240 cache.lru_list.push_back(region);
241 cache.total_size_in_use -= region.size;
242 }
243
244 void * ptr() { return region.ptr; }
245 const void * ptr() const { return region.ptr; }
246 size_t size() const { return region.size; }
247 Key key() const { return region.key; }
248 Payload & payload() { return region.payload; }
249 const Payload & payload() const { return region.payload; }
250
251 private:
252 ArrayCache & cache;
253 RegionMetadata & region;
254 };
255
256 using HolderPtr = std::shared_ptr<Holder>;
257private:
258
259 /// Represents pending insertion attempt.
260 struct InsertToken
261 {
262 InsertToken(ArrayCache & cache_) : cache(cache_) {}
263
264 std::mutex mutex;
265 bool cleaned_up = false; /// Protected by the token mutex
266 HolderPtr value; /// Protected by the token mutex
267
268 ArrayCache & cache;
269 size_t refcount = 0; /// Protected by the cache mutex
270 };
271
272 using InsertTokens = std::unordered_map<Key, std::shared_ptr<InsertToken>>;
273 InsertTokens insert_tokens;
274
275 /// This class is responsible for removing used insert tokens from the insert_tokens map.
276 /// Among several concurrent threads the first successful one is responsible for removal. But if they all
277 /// fail, then the last one is responsible.
278 struct InsertTokenHolder
279 {
280 const Key * key = nullptr;
281 std::shared_ptr<InsertToken> token;
282 bool cleaned_up = false;
283
284 InsertTokenHolder() = default;
285
286 void acquire(const Key * key_, const std::shared_ptr<InsertToken> & token_, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
287 {
288 key = key_;
289 token = token_;
290 ++token->refcount;
291 }
292
293 void cleanup([[maybe_unused]] std::lock_guard<std::mutex> & token_lock, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
294 {
295 token->cache.insert_tokens.erase(*key);
296 token->cleaned_up = true;
297 cleaned_up = true;
298 }
299
300 ~InsertTokenHolder()
301 {
302 if (!token)
303 return;
304
305 if (cleaned_up)
306 return;
307
308 std::lock_guard token_lock(token->mutex);
309
310 if (token->cleaned_up)
311 return;
312
313 std::lock_guard cache_lock(token->cache.mutex);
314
315 --token->refcount;
316 if (token->refcount == 0)
317 cleanup(token_lock, cache_lock);
318 }
319 };
320
321 friend struct InsertTokenHolder;
322
323
324 static size_t roundUp(size_t x, size_t rounding)
325 {
326 return (x + (rounding - 1)) / rounding * rounding;
327 }
328
329 static constexpr size_t page_size = 4096;
330
331 /// Sizes and addresses of allocated memory will be aligned to specified boundary.
332 static constexpr size_t alignment = 16;
333
334
335 /// Precondition: region is not in lru_list, not in key_map, not in size_multimap.
336 /// Postcondition: region is not in lru_list, not in key_map,
337 /// inserted into size_multimap, possibly coalesced with adjacent free regions.
338 void freeRegion(RegionMetadata & region) noexcept
339 {
340 auto adjacency_list_it = adjacency_list.iterator_to(region);
341
342 auto left_it = adjacency_list_it;
343 auto right_it = adjacency_list_it;
344
345 //size_t was_size = region.size;
346
347 if (left_it != adjacency_list.begin())
348 {
349 --left_it;
350
351 //std::cerr << "left_it->isFree(): " << left_it->isFree() << "\n";
352
353 if (left_it->chunk == region.chunk && left_it->isFree())
354 {
355 region.size += left_it->size;
356 region.char_ptr-= left_it->size;
357 size_multimap.erase(size_multimap.iterator_to(*left_it));
358 adjacency_list.erase_and_dispose(left_it, [](RegionMetadata * elem) { elem->destroy(); });
359 }
360 }
361
362 ++right_it;
363 if (right_it != adjacency_list.end())
364 {
365 //std::cerr << "right_it->isFree(): " << right_it->isFree() << "\n";
366
367 if (right_it->chunk == region.chunk && right_it->isFree())
368 {
369 region.size += right_it->size;
370 size_multimap.erase(size_multimap.iterator_to(*right_it));
371 adjacency_list.erase_and_dispose(right_it, [](RegionMetadata * elem) { elem->destroy(); });
372 }
373 }
374
375 //std::cerr << "size is enlarged: " << was_size << " -> " << region.size << "\n";
376
377 size_multimap.insert(region);
378 }
379
380
381 void evictRegion(RegionMetadata & evicted_region) noexcept
382 {
383 total_allocated_size -= evicted_region.size;
384
385 lru_list.erase(lru_list.iterator_to(evicted_region));
386
387 if (evicted_region.KeyMapHook::is_linked())
388 key_map.erase(key_map.iterator_to(evicted_region));
389
390 ++evictions;
391 evicted_bytes += evicted_region.size;
392
393 freeRegion(evicted_region);
394 }
395
396
397 /// Evict region from cache and return it, coalesced with nearby free regions.
398 /// While size is not enough, evict adjacent regions at right, if any.
399 /// If nothing to evict, returns nullptr.
400 /// Region is removed from lru_list and key_map and inserted into size_multimap.
401 RegionMetadata * evictSome(size_t requested_size) noexcept
402 {
403 if (lru_list.empty())
404 return nullptr;
405
406 /*for (const auto & elem : adjacency_list)
407 std::cerr << (!elem.SizeMultimapHook::is_linked() ? "\033[1m" : "") << elem.size << (!elem.SizeMultimapHook::is_linked() ? "\033[0m " : " ");
408 std::cerr << '\n';*/
409
410 auto it = adjacency_list.iterator_to(lru_list.front());
411
412 while (true)
413 {
414 RegionMetadata & evicted_region = *it;
415 evictRegion(evicted_region);
416
417 if (evicted_region.size >= requested_size)
418 return &evicted_region;
419
420 ++it;
421 if (it == adjacency_list.end() || it->chunk != evicted_region.chunk || !it->LRUListHook::is_linked())
422 return &evicted_region;
423
424 ++secondary_evictions;
425 }
426 }
427
428
429 /// Allocates a chunk of specified size. Creates free region, spanning through whole chunk and returns it.
430 RegionMetadata * addNewChunk(size_t size)
431 {
432 /// ASLR by hand.
433 void * address_hint = reinterpret_cast<void *>(std::uniform_int_distribution<size_t>(0x100000000000UL, 0x700000000000UL)(rng));
434
435 chunks.emplace_back(size, address_hint);
436 Chunk & chunk = chunks.back();
437
438 total_chunks_size += size;
439
440 /// Create free region spanning through chunk.
441 RegionMetadata * free_region;
442 try
443 {
444 free_region = RegionMetadata::create();
445 }
446 catch (...)
447 {
448 total_chunks_size -= size;
449 chunks.pop_back();
450 throw;
451 }
452
453 free_region->ptr = chunk.ptr;
454 free_region->chunk = chunk.ptr;
455 free_region->size = chunk.size;
456
457 adjacency_list.push_back(*free_region);
458 size_multimap.insert(*free_region);
459
460 return free_region;
461 }
462
463
464 /// Precondition: free_region.size >= size.
465 RegionMetadata * allocateFromFreeRegion(RegionMetadata & free_region, size_t size)
466 {
467 ++allocations;
468 allocated_bytes += size;
469
470 if (free_region.size == size)
471 {
472 total_allocated_size += size;
473 size_multimap.erase(size_multimap.iterator_to(free_region));
474 return &free_region;
475 }
476
477 RegionMetadata * allocated_region = RegionMetadata::create();
478 total_allocated_size += size;
479
480 allocated_region->ptr = free_region.ptr;
481 allocated_region->chunk = free_region.chunk;
482 allocated_region->size = size;
483
484 size_multimap.erase(size_multimap.iterator_to(free_region));
485 free_region.size -= size;
486 free_region.char_ptr += size;
487 size_multimap.insert(free_region);
488
489 adjacency_list.insert(adjacency_list.iterator_to(free_region), *allocated_region);
490 return allocated_region;
491 }
492
493
494 /// Does not insert allocated region to key_map or lru_list. Caller must do it.
495 RegionMetadata * allocate(size_t size)
496 {
497 size = roundUp(size, alignment);
498
499 /// Look up to size multimap to find free region of specified size.
500 auto it = size_multimap.lower_bound(size, RegionCompareBySize());
501 if (size_multimap.end() != it)
502 {
503 return allocateFromFreeRegion(*it, size);
504 }
505
506 /// If nothing was found and total size of allocated chunks plus required size is lower than maximum,
507 /// allocate a new chunk.
508 size_t required_chunk_size = std::max(min_chunk_size, roundUp(size, page_size));
509 if (total_chunks_size + required_chunk_size <= max_total_size)
510 {
511 /// Create free region spanning through chunk.
512 RegionMetadata * free_region = addNewChunk(required_chunk_size);
513 return allocateFromFreeRegion(*free_region, size);
514 }
515
516// std::cerr << "Requested size: " << size << "\n";
517
518 /// Evict something from cache and continue.
519 while (true)
520 {
521 RegionMetadata * res = evictSome(size);
522
523 /// Nothing to evict. All cache is full and in use - cannot allocate memory.
524 if (!res)
525 return nullptr;
526
527 /// Not enough. Evict more.
528 if (res->size < size)
529 continue;
530
531 return allocateFromFreeRegion(*res, size);
532 }
533 }
534
535
536public:
537 ArrayCache(size_t max_total_size_) : max_total_size(max_total_size_)
538 {
539 }
540
541 ~ArrayCache()
542 {
543 std::lock_guard cache_lock(mutex);
544
545 key_map.clear();
546 lru_list.clear();
547 size_multimap.clear();
548 adjacency_list.clear_and_dispose([](RegionMetadata * elem) { elem->destroy(); });
549 }
550
551
552 /// If the value for the key is in the cache, returns it.
553 ///
554 /// If it is not, calls 'get_size' to obtain required size of storage for key,
555 /// then allocates storage and call 'initialize' for necessary initialization before data from cache could be used.
556 ///
557 /// Only one of several concurrent threads calling this method will call get_size or initialize,
558 /// others will wait for that call to complete and will use its result (this helps prevent cache stampede).
559 ///
560 /// Exceptions occuring in callbacks will be propagated to the caller.
561 /// Another thread from the set of concurrent threads will then try to call its callbacks etc.
562 ///
563 /// Returns cached value wrapped by holder, preventing cache entry from eviction.
564 /// Also could return a bool indicating whether the value was produced during this call.
565 template <typename GetSizeFunc, typename InitializeFunc>
566 HolderPtr getOrSet(const Key & key, GetSizeFunc && get_size, InitializeFunc && initialize, bool * was_calculated)
567 {
568 InsertTokenHolder token_holder;
569 {
570 std::lock_guard cache_lock(mutex);
571
572 auto it = key_map.find(key, RegionCompareByKey());
573 if (key_map.end() != it)
574 {
575 ++hits;
576 if (was_calculated)
577 *was_calculated = false;
578
579 return std::make_shared<Holder>(*this, *it);
580 }
581
582 auto & token = insert_tokens[key];
583 if (!token)
584 token = std::make_shared<InsertToken>(*this);
585
586 token_holder.acquire(&key, token, cache_lock);
587 }
588
589 InsertToken * token = token_holder.token.get();
590
591 std::lock_guard token_lock(token->mutex);
592
593 token_holder.cleaned_up = token->cleaned_up;
594
595 if (token->value)
596 {
597 /// Another thread already produced the value while we waited for token->mutex.
598 ++hits;
599 ++concurrent_hits;
600 if (was_calculated)
601 *was_calculated = false;
602
603 return token->value;
604 }
605
606 ++misses;
607
608 size_t size = get_size();
609
610 RegionMetadata * region;
611 {
612 std::lock_guard cache_lock(mutex);
613 region = allocate(size);
614 }
615
616 /// Cannot allocate memory.
617 if (!region)
618 return {};
619
620 region->key = key;
621
622 {
623 total_size_currently_initialized += size;
624 SCOPE_EXIT({ total_size_currently_initialized -= size; });
625
626 try
627 {
628 initialize(region->ptr, region->payload);
629 }
630 catch (...)
631 {
632 {
633 std::lock_guard cache_lock(mutex);
634 freeRegion(*region);
635 }
636 throw;
637 }
638 }
639
640 std::lock_guard cache_lock(mutex);
641
642 try
643 {
644 token->value = std::make_shared<Holder>(*this, *region);
645
646 /// Insert the new value only if the token is still in present in insert_tokens.
647 /// (The token may be absent because of a concurrent reset() call).
648 auto token_it = insert_tokens.find(key);
649 if (token_it != insert_tokens.end() && token_it->second.get() == token)
650 {
651 key_map.insert(*region);
652 }
653
654 if (!token->cleaned_up)
655 token_holder.cleanup(token_lock, cache_lock);
656
657 if (was_calculated)
658 *was_calculated = true;
659
660 return token->value;
661 }
662 catch (...)
663 {
664 if (region->KeyMapHook::is_linked())
665 key_map.erase(key_map.iterator_to(*region));
666
667 freeRegion(*region);
668 throw;
669 }
670 }
671
672
673 struct Statistics
674 {
675 size_t total_chunks_size = 0;
676 size_t total_allocated_size = 0;
677 size_t total_size_currently_initialized = 0;
678 size_t total_size_in_use = 0;
679
680 size_t num_chunks = 0;
681 size_t num_regions = 0;
682 size_t num_free_regions = 0;
683 size_t num_regions_in_use = 0;
684 size_t num_keyed_regions = 0;
685
686 size_t hits = 0;
687 size_t concurrent_hits = 0;
688 size_t misses = 0;
689
690 size_t allocations = 0;
691 size_t allocated_bytes = 0;
692 size_t evictions = 0;
693 size_t evicted_bytes = 0;
694 size_t secondary_evictions = 0;
695 };
696
697 Statistics getStatistics() const
698 {
699 std::lock_guard cache_lock(mutex);
700 Statistics res;
701
702 res.total_chunks_size = total_chunks_size;
703 res.total_allocated_size = total_allocated_size;
704 res.total_size_currently_initialized = total_size_currently_initialized.load(std::memory_order_relaxed);
705 res.total_size_in_use = total_size_in_use;
706
707 res.num_chunks = chunks.size();
708 res.num_regions = adjacency_list.size();
709 res.num_free_regions = size_multimap.size();
710 res.num_regions_in_use = adjacency_list.size() - size_multimap.size() - lru_list.size();
711 res.num_keyed_regions = key_map.size();
712
713 res.hits = hits.load(std::memory_order_relaxed);
714 res.concurrent_hits = concurrent_hits.load(std::memory_order_relaxed);
715 res.misses = misses.load(std::memory_order_relaxed);
716
717 res.allocations = allocations;
718 res.allocated_bytes = allocated_bytes;
719 res.evictions = evictions;
720 res.evicted_bytes = evicted_bytes;
721 res.secondary_evictions = secondary_evictions;
722
723 return res;
724 }
725};
726
727template <typename Key, typename Payload> constexpr size_t ArrayCache<Key, Payload>::min_chunk_size;
728