1#include "duckdb/storage/buffer/buffer_pool.hpp"
2#include "duckdb/parallel/concurrentqueue.hpp"
3#include "duckdb/common/exception.hpp"
4
5namespace duckdb {
6
7typedef duckdb_moodycamel::ConcurrentQueue<BufferEvictionNode> eviction_queue_t;
8
9struct EvictionQueue {
10 eviction_queue_t q;
11};
12
13bool BufferEvictionNode::CanUnload(BlockHandle &handle_p) {
14 if (timestamp != handle_p.eviction_timestamp) {
15 // handle was used in between
16 return false;
17 }
18 return handle_p.CanUnload();
19}
20
21shared_ptr<BlockHandle> BufferEvictionNode::TryGetBlockHandle() {
22 auto handle_p = handle.lock();
23 if (!handle_p) {
24 // BlockHandle has been destroyed
25 return nullptr;
26 }
27 if (!CanUnload(handle_p&: *handle_p)) {
28 // handle was used in between
29 return nullptr;
30 }
31 // this is the latest node in the queue with this handle
32 return handle_p;
33}
34
35BufferPool::BufferPool(idx_t maximum_memory)
36 : current_memory(0), maximum_memory(maximum_memory), queue(make_uniq<EvictionQueue>()), queue_insertions(0) {
37}
38BufferPool::~BufferPool() {
39}
40
41void BufferPool::AddToEvictionQueue(shared_ptr<BlockHandle> &handle) {
42 constexpr int INSERT_INTERVAL = 1024;
43
44 D_ASSERT(handle->readers == 0);
45 handle->eviction_timestamp++;
46 // After each 1024 insertions, run through the queue and purge.
47 if ((++queue_insertions % INSERT_INTERVAL) == 0) {
48 PurgeQueue();
49 }
50 queue->q.enqueue(item: BufferEvictionNode(weak_ptr<BlockHandle>(handle), handle->eviction_timestamp));
51}
52
53void BufferPool::IncreaseUsedMemory(idx_t size) {
54 current_memory += size;
55}
56
57idx_t BufferPool::GetUsedMemory() {
58 return current_memory;
59}
60idx_t BufferPool::GetMaxMemory() {
61 return maximum_memory;
62}
63
64BufferPool::EvictionResult BufferPool::EvictBlocks(idx_t extra_memory, idx_t memory_limit,
65 unique_ptr<FileBuffer> *buffer) {
66 BufferEvictionNode node;
67 TempBufferPoolReservation r(*this, extra_memory);
68 while (current_memory > memory_limit) {
69 // get a block to unpin from the queue
70 if (!queue->q.try_dequeue(item&: node)) {
71 // Failed to reserve. Adjust size of temp reservation to 0.
72 r.Resize(new_size: 0);
73 return {.success: false, .reservation: std::move(r)};
74 }
75 // get a reference to the underlying block pointer
76 auto handle = node.TryGetBlockHandle();
77 if (!handle) {
78 continue;
79 }
80 // we might be able to free this block: grab the mutex and check if we can free it
81 lock_guard<mutex> lock(handle->lock);
82 if (!node.CanUnload(handle_p&: *handle)) {
83 // something changed in the mean-time, bail out
84 continue;
85 }
86 // hooray, we can unload the block
87 if (buffer && handle->buffer->AllocSize() == extra_memory) {
88 // we can actually re-use the memory directly!
89 *buffer = handle->UnloadAndTakeBlock();
90 return {.success: true, .reservation: std::move(r)};
91 } else {
92 // release the memory and mark the block as unloaded
93 handle->Unload();
94 }
95 }
96 return {.success: true, .reservation: std::move(r)};
97}
98
99void BufferPool::PurgeQueue() {
100 BufferEvictionNode node;
101 while (true) {
102 if (!queue->q.try_dequeue(item&: node)) {
103 break;
104 }
105 auto handle = node.TryGetBlockHandle();
106 if (!handle) {
107 continue;
108 } else {
109 queue->q.enqueue(item: std::move(node));
110 break;
111 }
112 }
113}
114
115void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
116 lock_guard<mutex> l_lock(limit_lock);
117 // try to evict until the limit is reached
118 if (!EvictBlocks(extra_memory: 0, memory_limit: limit).success) {
119 throw OutOfMemoryException(
120 "Failed to change memory limit to %lld: could not free up enough memory for the new limit%s", limit,
121 exception_postscript);
122 }
123 idx_t old_limit = maximum_memory;
124 // set the global maximum memory to the new limit if successful
125 maximum_memory = limit;
126 // evict again
127 if (!EvictBlocks(extra_memory: 0, memory_limit: limit).success) {
128 // failed: go back to old limit
129 maximum_memory = old_limit;
130 throw OutOfMemoryException(
131 "Failed to change memory limit to %lld: could not free up enough memory for the new limit%s", limit,
132 exception_postscript);
133 }
134}
135
136} // namespace duckdb
137