1 | #include "duckdb/storage/buffer/buffer_pool.hpp" |
2 | #include "duckdb/parallel/concurrentqueue.hpp" |
3 | #include "duckdb/common/exception.hpp" |
4 | |
5 | namespace duckdb { |
6 | |
7 | typedef duckdb_moodycamel::ConcurrentQueue<BufferEvictionNode> eviction_queue_t; |
8 | |
9 | struct EvictionQueue { |
10 | eviction_queue_t q; |
11 | }; |
12 | |
13 | bool BufferEvictionNode::CanUnload(BlockHandle &handle_p) { |
14 | if (timestamp != handle_p.eviction_timestamp) { |
15 | // handle was used in between |
16 | return false; |
17 | } |
18 | return handle_p.CanUnload(); |
19 | } |
20 | |
21 | shared_ptr<BlockHandle> BufferEvictionNode::TryGetBlockHandle() { |
22 | auto handle_p = handle.lock(); |
23 | if (!handle_p) { |
24 | // BlockHandle has been destroyed |
25 | return nullptr; |
26 | } |
27 | if (!CanUnload(handle_p&: *handle_p)) { |
28 | // handle was used in between |
29 | return nullptr; |
30 | } |
31 | // this is the latest node in the queue with this handle |
32 | return handle_p; |
33 | } |
34 | |
35 | BufferPool::BufferPool(idx_t maximum_memory) |
36 | : current_memory(0), maximum_memory(maximum_memory), queue(make_uniq<EvictionQueue>()), queue_insertions(0) { |
37 | } |
38 | BufferPool::~BufferPool() { |
39 | } |
40 | |
41 | void BufferPool::AddToEvictionQueue(shared_ptr<BlockHandle> &handle) { |
42 | constexpr int INSERT_INTERVAL = 1024; |
43 | |
44 | D_ASSERT(handle->readers == 0); |
45 | handle->eviction_timestamp++; |
46 | // After each 1024 insertions, run through the queue and purge. |
47 | if ((++queue_insertions % INSERT_INTERVAL) == 0) { |
48 | PurgeQueue(); |
49 | } |
50 | queue->q.enqueue(item: BufferEvictionNode(weak_ptr<BlockHandle>(handle), handle->eviction_timestamp)); |
51 | } |
52 | |
53 | void BufferPool::IncreaseUsedMemory(idx_t size) { |
54 | current_memory += size; |
55 | } |
56 | |
57 | idx_t BufferPool::GetUsedMemory() { |
58 | return current_memory; |
59 | } |
60 | idx_t BufferPool::GetMaxMemory() { |
61 | return maximum_memory; |
62 | } |
63 | |
64 | BufferPool::EvictionResult BufferPool::EvictBlocks(idx_t , idx_t memory_limit, |
65 | unique_ptr<FileBuffer> *buffer) { |
66 | BufferEvictionNode node; |
67 | TempBufferPoolReservation r(*this, extra_memory); |
68 | while (current_memory > memory_limit) { |
69 | // get a block to unpin from the queue |
70 | if (!queue->q.try_dequeue(item&: node)) { |
71 | // Failed to reserve. Adjust size of temp reservation to 0. |
72 | r.Resize(new_size: 0); |
73 | return {.success: false, .reservation: std::move(r)}; |
74 | } |
75 | // get a reference to the underlying block pointer |
76 | auto handle = node.TryGetBlockHandle(); |
77 | if (!handle) { |
78 | continue; |
79 | } |
80 | // we might be able to free this block: grab the mutex and check if we can free it |
81 | lock_guard<mutex> lock(handle->lock); |
82 | if (!node.CanUnload(handle_p&: *handle)) { |
83 | // something changed in the mean-time, bail out |
84 | continue; |
85 | } |
86 | // hooray, we can unload the block |
87 | if (buffer && handle->buffer->AllocSize() == extra_memory) { |
88 | // we can actually re-use the memory directly! |
89 | *buffer = handle->UnloadAndTakeBlock(); |
90 | return {.success: true, .reservation: std::move(r)}; |
91 | } else { |
92 | // release the memory and mark the block as unloaded |
93 | handle->Unload(); |
94 | } |
95 | } |
96 | return {.success: true, .reservation: std::move(r)}; |
97 | } |
98 | |
99 | void BufferPool::PurgeQueue() { |
100 | BufferEvictionNode node; |
101 | while (true) { |
102 | if (!queue->q.try_dequeue(item&: node)) { |
103 | break; |
104 | } |
105 | auto handle = node.TryGetBlockHandle(); |
106 | if (!handle) { |
107 | continue; |
108 | } else { |
109 | queue->q.enqueue(item: std::move(node)); |
110 | break; |
111 | } |
112 | } |
113 | } |
114 | |
115 | void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) { |
116 | lock_guard<mutex> l_lock(limit_lock); |
117 | // try to evict until the limit is reached |
118 | if (!EvictBlocks(extra_memory: 0, memory_limit: limit).success) { |
119 | throw OutOfMemoryException( |
120 | "Failed to change memory limit to %lld: could not free up enough memory for the new limit%s" , limit, |
121 | exception_postscript); |
122 | } |
123 | idx_t old_limit = maximum_memory; |
124 | // set the global maximum memory to the new limit if successful |
125 | maximum_memory = limit; |
126 | // evict again |
127 | if (!EvictBlocks(extra_memory: 0, memory_limit: limit).success) { |
128 | // failed: go back to old limit |
129 | maximum_memory = old_limit; |
130 | throw OutOfMemoryException( |
131 | "Failed to change memory limit to %lld: could not free up enough memory for the new limit%s" , limit, |
132 | exception_postscript); |
133 | } |
134 | } |
135 | |
136 | } // namespace duckdb |
137 | |