1#include "duckdb/storage/buffer_manager.hpp"
2
3#include "duckdb/common/exception.hpp"
4
5using namespace duckdb;
6using namespace std;
7
8BufferManager::BufferManager(FileSystem &fs, BlockManager &manager, string tmp, idx_t maximum_memory)
9 : fs(fs), manager(manager), current_memory(0), maximum_memory(maximum_memory), temp_directory(move(tmp)),
10 temporary_id(MAXIMUM_BLOCK) {
11 if (!temp_directory.empty()) {
12 fs.CreateDirectory(temp_directory);
13 }
14}
15
16BufferManager::~BufferManager() {
17 if (!temp_directory.empty()) {
18 fs.RemoveDirectory(temp_directory);
19 }
20}
21
22unique_ptr<BufferHandle> BufferManager::Pin(block_id_t block_id, bool can_destroy) {
23 // first obtain a lock on the set of blocks
24 lock_guard<mutex> lock(block_lock);
25 if (block_id < MAXIMUM_BLOCK) {
26 return PinBlock(block_id);
27 } else {
28 return PinBuffer(block_id, can_destroy);
29 }
30}
31
32unique_ptr<BufferHandle> BufferManager::PinBlock(block_id_t block_id) {
33 // this method should only be used to pin blocks that exist in the file
34 assert(block_id < MAXIMUM_BLOCK);
35
36 // check if the block is already loaded
37 Block *result_block;
38 auto entry = blocks.find(block_id);
39 if (entry == blocks.end()) {
40 // block is not loaded, load the block
41 current_memory += Storage::BLOCK_ALLOC_SIZE;
42 unique_ptr<Block> block;
43 if (current_memory > maximum_memory) {
44 // not enough memory to hold the block: have to evict a block first
45 block = EvictBlock();
46 if (!block) {
47 // evicted a managed buffer: no block returned
48 // create a new block
49 block = make_unique<Block>(block_id);
50 } else {
51 // take over the evicted block and use it to hold this block
52 block->id = block_id;
53 }
54 } else {
55 // enough memory to create a new block: allocate it
56 block = make_unique<Block>(block_id);
57 }
58 manager.Read(*block);
59 result_block = block.get();
60 // create a new buffer entry for this block and insert it into the block list
61 auto buffer_entry = make_unique<BufferEntry>(move(block));
62 blocks.insert(make_pair(block_id, buffer_entry.get()));
63 used_list.Append(move(buffer_entry));
64 } else {
65 auto buffer = entry->second->buffer.get();
66 assert(buffer->type == FileBufferType::BLOCK);
67 result_block = (Block *)buffer;
68 // add one to the reference count
69 AddReference(entry->second);
70 }
71 return make_unique<BufferHandle>(*this, block_id, result_block);
72}
73
74void BufferManager::AddReference(BufferEntry *entry) {
75 entry->ref_count++;
76 if (entry->ref_count == 1) {
77 // ref count is 1, that means it used to be 0 (unused)
78 // move from lru to used_list
79 auto current_entry = lru.Erase(entry);
80 used_list.Append(move(current_entry));
81 }
82}
83
84void BufferManager::Unpin(block_id_t block_id) {
85 lock_guard<mutex> lock(block_lock);
86 // first find the block in the set of blocks
87 auto entry = blocks.find(block_id);
88 assert(entry != blocks.end());
89
90 auto buffer_entry = entry->second;
91 // then decerase the ref count
92 assert(buffer_entry->ref_count > 0);
93 buffer_entry->ref_count--;
94 if (buffer_entry->ref_count == 0) {
95 if (buffer_entry->buffer->type == FileBufferType::MANAGED_BUFFER) {
96 auto managed = (ManagedBuffer *)buffer_entry->buffer.get();
97 if (managed->can_destroy) {
98 // this is a managed buffer that we can destroy
99 // instead of adding it to the LRU list, just deallocate the managed buffer immediately
100 current_memory -= managed->size;
101 return;
102 }
103 }
104 // no references left: move block out of used list and into lru list
105 auto entry = used_list.Erase(buffer_entry);
106 lru.Append(move(entry));
107 }
108}
109
110unique_ptr<Block> BufferManager::EvictBlock() {
111 if (temp_directory.empty()) {
112 throw Exception("Out-of-memory: cannot evict buffer because no temporary directory is specified!\nTo enable "
113 "temporary buffer eviction set a temporary directory in the configuration");
114 }
115 // pop the first entry from the lru list
116 auto entry = lru.Pop();
117 if (!entry) {
118 throw Exception("Not enough memory to complete operation!");
119 }
120 assert(entry->ref_count == 0);
121 // erase this identifier from the set of blocks
122 auto buffer = entry->buffer.get();
123 if (buffer->type == FileBufferType::BLOCK) {
124 // block buffer: remove the block and reuse it
125 auto block = (Block *)buffer;
126 blocks.erase(block->id);
127 // free up the memory
128 current_memory -= Storage::BLOCK_ALLOC_SIZE;
129 // finally return the block obtained from the current entry
130 return unique_ptr_cast<FileBuffer, Block>(move(entry->buffer));
131 } else {
132 // managed buffer: cannot return a block here
133 auto managed = (ManagedBuffer *)buffer;
134 assert(!managed->can_destroy);
135
136 // cannot destroy this buffer: write it to disk first so it can be reloaded later
137 WriteTemporaryBuffer(*managed);
138
139 blocks.erase(managed->id);
140 // free up the memory
141 current_memory -= managed->size;
142 return nullptr;
143 }
144}
145
146unique_ptr<BufferHandle> BufferManager::Allocate(idx_t alloc_size, bool can_destroy) {
147 assert(alloc_size >= Storage::BLOCK_ALLOC_SIZE);
148
149 lock_guard<mutex> lock(block_lock);
150 // first evict blocks until we have enough memory to store this buffer
151 while (current_memory + alloc_size > maximum_memory) {
152 EvictBlock();
153 }
154 // now allocate the buffer with a new temporary id
155 auto temp_id = ++temporary_id;
156 auto buffer = make_unique<ManagedBuffer>(*this, alloc_size, can_destroy, temp_id);
157 auto managed_buffer = buffer.get();
158 current_memory += buffer->AllocSize();
159 // create a new entry and append it to the used list
160 auto buffer_entry = make_unique<BufferEntry>(move(buffer));
161 blocks.insert(make_pair(temp_id, buffer_entry.get()));
162 used_list.Append(move(buffer_entry));
163 // now return a handle to the entry
164 return make_unique<BufferHandle>(*this, temp_id, managed_buffer);
165}
166
167void BufferManager::DestroyBuffer(block_id_t buffer_id, bool can_destroy) {
168 lock_guard<mutex> lock(block_lock);
169
170 assert(buffer_id >= MAXIMUM_BLOCK);
171 // this is like unpin, except we just destroy the entry entirely instead of adding it to the LRU list
172 // first find the block in the set of blocks
173 auto entry = blocks.find(buffer_id);
174 if (entry == blocks.end()) {
175 // buffer is not currently loaded into memory
176 // check if it was offloaded to disk instead
177 if (!can_destroy) {
178 // buffer was offloaded to disk: remove the file instead
179 DeleteTemporaryFile(buffer_id);
180 }
181 return;
182 }
183
184 auto handle = entry->second;
185 assert(handle->ref_count == 0);
186
187 current_memory -= handle->buffer->AllocSize();
188 blocks.erase(buffer_id);
189 lru.Erase(handle);
190}
191
192void BufferManager::SetLimit(idx_t limit) {
193 lock_guard<mutex> lock(block_lock);
194
195 while (current_memory > limit) {
196 EvictBlock();
197 }
198 maximum_memory = limit;
199}
200
201unique_ptr<BufferHandle> BufferManager::PinBuffer(block_id_t buffer_id, bool can_destroy) {
202 assert(buffer_id >= MAXIMUM_BLOCK);
203 // check if we have this buffer here
204 auto entry = blocks.find(buffer_id);
205 if (entry == blocks.end()) {
206 if (can_destroy) {
207 // buffer was destroyed: return nullptr
208 return nullptr;
209 } else {
210 // buffer was unloaded but not destroyed: read from disk
211 return ReadTemporaryBuffer(buffer_id);
212 }
213 }
214 // we still have the buffer, add a reference to it
215 auto buffer = entry->second->buffer.get();
216 AddReference(entry->second);
217 // now return it
218 assert(buffer->type == FileBufferType::MANAGED_BUFFER);
219 auto managed = (ManagedBuffer *)buffer;
220 assert(managed->id == buffer_id);
221 return make_unique<BufferHandle>(*this, buffer_id, managed);
222}
223
224string BufferManager::GetTemporaryPath(block_id_t id) {
225 return fs.JoinPath(temp_directory, to_string(id) + ".block");
226}
227
228void BufferManager::WriteTemporaryBuffer(ManagedBuffer &buffer) {
229 assert(buffer.size + Storage::BLOCK_HEADER_SIZE >= Storage::BLOCK_ALLOC_SIZE);
230 // get the path to write to
231 auto path = GetTemporaryPath(buffer.id);
232 // create the file and write the size followed by the buffer contents
233 auto handle = fs.OpenFile(path, FileFlags::WRITE | FileFlags::CREATE);
234 handle->Write(&buffer.size, sizeof(idx_t), 0);
235 buffer.Write(*handle, sizeof(idx_t));
236}
237
238unique_ptr<BufferHandle> BufferManager::ReadTemporaryBuffer(block_id_t id) {
239 if (temp_directory.empty()) {
240 throw Exception("Out-of-memory: cannot read buffer because no temporary directory is specified!\nTo enable "
241 "temporary buffer eviction set a temporary directory in the configuration");
242 }
243 idx_t alloc_size;
244 // open the temporary file and read the size
245 auto path = GetTemporaryPath(id);
246 auto handle = fs.OpenFile(path, FileFlags::READ);
247 handle->Read(&alloc_size, sizeof(idx_t), 0);
248 // first evict blocks until we can handle the size
249 while (current_memory + alloc_size > maximum_memory) {
250 EvictBlock();
251 }
252 // now allocate a buffer of this size and read the data into that buffer
253 auto buffer = make_unique<ManagedBuffer>(*this, alloc_size + Storage::BLOCK_HEADER_SIZE, false, id);
254 buffer->Read(*handle, sizeof(idx_t));
255
256 auto managed_buffer = buffer.get();
257 current_memory += buffer->AllocSize();
258 // create a new entry and append it to the used list
259 auto buffer_entry = make_unique<BufferEntry>(move(buffer));
260 blocks.insert(make_pair(id, buffer_entry.get()));
261 used_list.Append(move(buffer_entry));
262 // now return a handle to the entry
263 return make_unique<BufferHandle>(*this, id, managed_buffer);
264}
265
266void BufferManager::DeleteTemporaryFile(block_id_t id) {
267 auto path = GetTemporaryPath(id);
268 if (fs.FileExists(path)) {
269 fs.RemoveFile(path);
270 }
271}
272