1#include "duckdb/storage/standard_buffer_manager.hpp"
2
3#include "duckdb/common/allocator.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/common/set.hpp"
6#include "duckdb/storage/in_memory_block_manager.hpp"
7#include "duckdb/storage/storage_manager.hpp"
8#include "duckdb/main/attached_database.hpp"
9#include "duckdb/main/database.hpp"
10#include "duckdb/storage/buffer/buffer_pool.hpp"
11
12namespace duckdb {
13
14struct BufferAllocatorData : PrivateAllocatorData {
15 explicit BufferAllocatorData(StandardBufferManager &manager) : manager(manager) {
16 }
17
18 StandardBufferManager &manager;
19};
20
21unique_ptr<FileBuffer> StandardBufferManager::ConstructManagedBuffer(idx_t size, unique_ptr<FileBuffer> &&source,
22 FileBufferType type) {
23 unique_ptr<FileBuffer> result;
24 if (source) {
25 auto tmp = std::move(source);
26 D_ASSERT(tmp->AllocSize() == BufferManager::GetAllocSize(size));
27 result = make_uniq<FileBuffer>(args&: *tmp, args&: type);
28 } else {
29 // no re-usable buffer: allocate a new buffer
30 result = make_uniq<FileBuffer>(args&: Allocator::Get(db), args&: type, args&: size);
31 }
32 result->Initialize(info: DBConfig::GetConfig(db).options.debug_initialize);
33 return result;
34}
35
36class TemporaryFileManager;
37
38class TemporaryDirectoryHandle {
39public:
40 TemporaryDirectoryHandle(DatabaseInstance &db, string path_p);
41 ~TemporaryDirectoryHandle();
42
43 TemporaryFileManager &GetTempFile();
44
45private:
46 DatabaseInstance &db;
47 string temp_directory;
48 bool created_directory = false;
49 unique_ptr<TemporaryFileManager> temp_file;
50};
51
52void StandardBufferManager::SetTemporaryDirectory(const string &new_dir) {
53 if (temp_directory_handle) {
54 throw NotImplementedException("Cannot switch temporary directory after the current one has been used");
55 }
56 this->temp_directory = new_dir;
57}
58
59StandardBufferManager::StandardBufferManager(DatabaseInstance &db, string tmp)
60 : BufferManager(), db(db), buffer_pool(db.GetBufferPool()), temp_directory(std::move(tmp)),
61 temporary_id(MAXIMUM_BLOCK), buffer_allocator(BufferAllocatorAllocate, BufferAllocatorFree,
62 BufferAllocatorRealloc, make_uniq<BufferAllocatorData>(args&: *this)) {
63 temp_block_manager = make_uniq<InMemoryBlockManager>(args&: *this);
64}
65
66StandardBufferManager::~StandardBufferManager() {
67}
68
69BufferPool &StandardBufferManager::GetBufferPool() {
70 return buffer_pool;
71}
72
73idx_t StandardBufferManager::GetUsedMemory() const {
74 return buffer_pool.GetUsedMemory();
75}
76idx_t StandardBufferManager::GetMaxMemory() const {
77 return buffer_pool.GetMaxMemory();
78}
79
80// POTENTIALLY PROBLEMATIC
81// void StandardBufferManager::IncreaseUsedMemory(idx_t size, bool unsafe) {
82// if (!unsafe && buffer_pool.GetUsedMemory() + size > buffer_pool.GetMaxMemory()) {
83// throw OutOfMemoryException("Failed to allocate data of size %lld%s", size, InMemoryWarning());
84// }
85// buffer_pool.IncreaseUsedMemory(size);
86//}
87
88template <typename... ARGS>
89TempBufferPoolReservation StandardBufferManager::EvictBlocksOrThrow(idx_t memory_delta, unique_ptr<FileBuffer> *buffer,
90 ARGS... args) {
91 auto r = buffer_pool.EvictBlocks(extra_memory: memory_delta, memory_limit: buffer_pool.maximum_memory, buffer);
92 if (!r.success) {
93 string extra_text = StringUtil::Format(fmt_str: " (%s/%s used)", params: StringUtil::BytesToHumanReadableString(bytes: GetUsedMemory()),
94 params: StringUtil::BytesToHumanReadableString(bytes: GetMaxMemory()));
95 extra_text += InMemoryWarning();
96 throw OutOfMemoryException(args..., extra_text);
97 }
98 return std::move(r.reservation);
99}
100
101shared_ptr<BlockHandle> StandardBufferManager::RegisterSmallMemory(idx_t block_size) {
102 D_ASSERT(block_size < Storage::BLOCK_SIZE);
103 auto res = EvictBlocksOrThrow(memory_delta: block_size, buffer: nullptr, args: "could not allocate block of size %s%s",
104 args: StringUtil::BytesToHumanReadableString(bytes: block_size));
105
106 auto buffer = ConstructManagedBuffer(size: block_size, source: nullptr, type: FileBufferType::TINY_BUFFER);
107
108 // create a new block pointer for this block
109 return make_shared<BlockHandle>(args&: *temp_block_manager, args: ++temporary_id, args: std::move(buffer), args: false, args&: block_size,
110 args: std::move(res));
111}
112
113shared_ptr<BlockHandle> StandardBufferManager::RegisterMemory(idx_t block_size, bool can_destroy) {
114 D_ASSERT(block_size >= Storage::BLOCK_SIZE);
115 auto alloc_size = GetAllocSize(block_size);
116 // first evict blocks until we have enough memory to store this buffer
117 unique_ptr<FileBuffer> reusable_buffer;
118 auto res = EvictBlocksOrThrow(memory_delta: alloc_size, buffer: &reusable_buffer, args: "could not allocate block of size %s%s",
119 args: StringUtil::BytesToHumanReadableString(bytes: alloc_size));
120
121 auto buffer = ConstructManagedBuffer(size: block_size, source: std::move(reusable_buffer));
122
123 // create a new block pointer for this block
124 return make_shared<BlockHandle>(args&: *temp_block_manager, args: ++temporary_id, args: std::move(buffer), args&: can_destroy, args&: alloc_size,
125 args: std::move(res));
126}
127
128BufferHandle StandardBufferManager::Allocate(idx_t block_size, bool can_destroy, shared_ptr<BlockHandle> *block) {
129 shared_ptr<BlockHandle> local_block;
130 auto block_ptr = block ? block : &local_block;
131 *block_ptr = RegisterMemory(block_size, can_destroy);
132 return Pin(handle&: *block_ptr);
133}
134
135void StandardBufferManager::ReAllocate(shared_ptr<BlockHandle> &handle, idx_t block_size) {
136 D_ASSERT(block_size >= Storage::BLOCK_SIZE);
137 lock_guard<mutex> lock(handle->lock);
138 D_ASSERT(handle->state == BlockState::BLOCK_LOADED);
139 D_ASSERT(handle->memory_usage == handle->buffer->AllocSize());
140 D_ASSERT(handle->memory_usage == handle->memory_charge.size);
141
142 auto req = handle->buffer->CalculateMemory(user_size: block_size);
143 int64_t memory_delta = (int64_t)req.alloc_size - handle->memory_usage;
144
145 if (memory_delta == 0) {
146 return;
147 } else if (memory_delta > 0) {
148 // evict blocks until we have space to resize this block
149 auto reservation = EvictBlocksOrThrow(memory_delta, buffer: nullptr, args: "failed to resize block from %s to %s%s",
150 args: StringUtil::BytesToHumanReadableString(bytes: handle->memory_usage),
151 args: StringUtil::BytesToHumanReadableString(bytes: req.alloc_size));
152 // EvictBlocks decrements 'current_memory' for us.
153 handle->memory_charge.Merge(src: std::move(reservation));
154 } else {
155 // no need to evict blocks, but we do need to decrement 'current_memory'.
156 handle->memory_charge.Resize(new_size: req.alloc_size);
157 }
158
159 handle->ResizeBuffer(block_size, memory_delta);
160}
161
162BufferHandle StandardBufferManager::Pin(shared_ptr<BlockHandle> &handle) {
163 idx_t required_memory;
164 {
165 // lock the block
166 lock_guard<mutex> lock(handle->lock);
167 // check if the block is already loaded
168 if (handle->state == BlockState::BLOCK_LOADED) {
169 // the block is loaded, increment the reader count and return a pointer to the handle
170 handle->readers++;
171 return handle->Load(handle);
172 }
173 required_memory = handle->memory_usage;
174 }
175 // evict blocks until we have space for the current block
176 unique_ptr<FileBuffer> reusable_buffer;
177 auto reservation = EvictBlocksOrThrow(memory_delta: required_memory, buffer: &reusable_buffer, args: "failed to pin block of size %s%s",
178 args: StringUtil::BytesToHumanReadableString(bytes: required_memory));
179 // lock the handle again and repeat the check (in case anybody loaded in the mean time)
180 lock_guard<mutex> lock(handle->lock);
181 // check if the block is already loaded
182 if (handle->state == BlockState::BLOCK_LOADED) {
183 // the block is loaded, increment the reader count and return a pointer to the handle
184 handle->readers++;
185 reservation.Resize(new_size: 0);
186 return handle->Load(handle);
187 }
188 // now we can actually load the current block
189 D_ASSERT(handle->readers == 0);
190 handle->readers = 1;
191 auto buf = handle->Load(handle, buffer: std::move(reusable_buffer));
192 handle->memory_charge = std::move(reservation);
193 // In the case of a variable sized block, the buffer may be smaller than a full block.
194 int64_t delta = handle->buffer->AllocSize() - handle->memory_usage;
195 if (delta) {
196 D_ASSERT(delta < 0);
197 handle->memory_usage += delta;
198 handle->memory_charge.Resize(new_size: handle->memory_usage);
199 }
200 D_ASSERT(handle->memory_usage == handle->buffer->AllocSize());
201 return buf;
202}
203
204void StandardBufferManager::PurgeQueue() {
205 buffer_pool.PurgeQueue();
206}
207
208void StandardBufferManager::AddToEvictionQueue(shared_ptr<BlockHandle> &handle) {
209 buffer_pool.AddToEvictionQueue(handle);
210}
211
212void StandardBufferManager::VerifyZeroReaders(shared_ptr<BlockHandle> &handle) {
213#ifdef DUCKDB_DEBUG_DESTROY_BLOCKS
214 auto replacement_buffer = make_uniq<FileBuffer>(Allocator::Get(db), handle->buffer->type,
215 handle->memory_usage - Storage::BLOCK_HEADER_SIZE);
216 memcpy(replacement_buffer->buffer, handle->buffer->buffer, handle->buffer->size);
217 memset(handle->buffer->buffer, 0xa5, handle->buffer->size); // 0xa5 is default memory in debug mode
218 handle->buffer = std::move(replacement_buffer);
219#endif
220}
221
222void StandardBufferManager::Unpin(shared_ptr<BlockHandle> &handle) {
223 lock_guard<mutex> lock(handle->lock);
224 if (!handle->buffer || handle->buffer->type == FileBufferType::TINY_BUFFER) {
225 return;
226 }
227 D_ASSERT(handle->readers > 0);
228 handle->readers--;
229 if (handle->readers == 0) {
230 VerifyZeroReaders(handle);
231 buffer_pool.AddToEvictionQueue(handle);
232 }
233}
234
235void StandardBufferManager::SetLimit(idx_t limit) {
236 buffer_pool.SetLimit(limit, exception_postscript: InMemoryWarning());
237}
238
239//===--------------------------------------------------------------------===//
240// Temporary File Management
241//===--------------------------------------------------------------------===//
242unique_ptr<FileBuffer> ReadTemporaryBufferInternal(BufferManager &buffer_manager, FileHandle &handle, idx_t position,
243 idx_t size, block_id_t id, unique_ptr<FileBuffer> reusable_buffer) {
244 auto buffer = buffer_manager.ConstructManagedBuffer(size, source: std::move(reusable_buffer));
245 buffer->Read(handle, location: position);
246 return buffer;
247}
248
249struct TemporaryFileIndex {
250 explicit TemporaryFileIndex(idx_t file_index = DConstants::INVALID_INDEX,
251 idx_t block_index = DConstants::INVALID_INDEX)
252 : file_index(file_index), block_index(block_index) {
253 }
254
255 idx_t file_index;
256 idx_t block_index;
257
258public:
259 bool IsValid() {
260 return block_index != DConstants::INVALID_INDEX;
261 }
262};
263
264struct BlockIndexManager {
265 BlockIndexManager() : max_index(0) {
266 }
267
268public:
269 //! Obtains a new block index from the index manager
270 idx_t GetNewBlockIndex() {
271 auto index = GetNewBlockIndexInternal();
272 indexes_in_use.insert(x: index);
273 return index;
274 }
275
276 //! Removes an index from the block manager
277 //! Returns true if the max_index has been altered
278 bool RemoveIndex(idx_t index) {
279 // remove this block from the set of blocks
280 auto entry = indexes_in_use.find(x: index);
281 if (entry == indexes_in_use.end()) {
282 throw InternalException("RemoveIndex - index %llu not found in indexes_in_use", index);
283 }
284 indexes_in_use.erase(position: entry);
285 free_indexes.insert(x: index);
286 // check if we can truncate the file
287
288 // get the max_index in use right now
289 auto max_index_in_use = indexes_in_use.empty() ? 0 : *indexes_in_use.rbegin();
290 if (max_index_in_use < max_index) {
291 // max index in use is lower than the max_index
292 // reduce the max_index
293 max_index = indexes_in_use.empty() ? 0 : max_index_in_use + 1;
294 // we can remove any free_indexes that are larger than the current max_index
295 while (!free_indexes.empty()) {
296 auto max_entry = *free_indexes.rbegin();
297 if (max_entry < max_index) {
298 break;
299 }
300 free_indexes.erase(x: max_entry);
301 }
302 return true;
303 }
304 return false;
305 }
306
307 idx_t GetMaxIndex() {
308 return max_index;
309 }
310
311 bool HasFreeBlocks() {
312 return !free_indexes.empty();
313 }
314
315private:
316 idx_t GetNewBlockIndexInternal() {
317 if (free_indexes.empty()) {
318 return max_index++;
319 }
320 auto entry = free_indexes.begin();
321 auto index = *entry;
322 free_indexes.erase(position: entry);
323 return index;
324 }
325
326 idx_t max_index;
327 set<idx_t> free_indexes;
328 set<idx_t> indexes_in_use;
329};
330
331class TemporaryFileHandle {
332 constexpr static idx_t MAX_ALLOWED_INDEX = 4000;
333
334public:
335 TemporaryFileHandle(DatabaseInstance &db, const string &temp_directory, idx_t index)
336 : db(db), file_index(index), path(FileSystem::GetFileSystem(db).JoinPath(
337 a: temp_directory, path: "duckdb_temp_storage-" + to_string(val: index) + ".tmp")) {
338 }
339
340public:
341 struct TemporaryFileLock {
342 explicit TemporaryFileLock(mutex &mutex) : lock(mutex) {
343 }
344
345 lock_guard<mutex> lock;
346 };
347
348public:
349 TemporaryFileIndex TryGetBlockIndex() {
350 TemporaryFileLock lock(file_lock);
351 if (index_manager.GetMaxIndex() >= MAX_ALLOWED_INDEX && index_manager.HasFreeBlocks()) {
352 // file is at capacity
353 return TemporaryFileIndex();
354 }
355 // open the file handle if it does not yet exist
356 CreateFileIfNotExists(lock);
357 // fetch a new block index to write to
358 auto block_index = index_manager.GetNewBlockIndex();
359 return TemporaryFileIndex(file_index, block_index);
360 }
361
362 void WriteTemporaryFile(FileBuffer &buffer, TemporaryFileIndex index) {
363 D_ASSERT(buffer.size == Storage::BLOCK_SIZE);
364 buffer.Write(handle&: *handle, location: GetPositionInFile(index: index.block_index));
365 }
366
367 unique_ptr<FileBuffer> ReadTemporaryBuffer(block_id_t id, idx_t block_index,
368 unique_ptr<FileBuffer> reusable_buffer) {
369 return ReadTemporaryBufferInternal(buffer_manager&: BufferManager::GetBufferManager(db), handle&: *handle, position: GetPositionInFile(index: block_index),
370 size: Storage::BLOCK_SIZE, id, reusable_buffer: std::move(reusable_buffer));
371 }
372
373 void EraseBlockIndex(block_id_t block_index) {
374 // remove the block (and potentially truncate the temp file)
375 TemporaryFileLock lock(file_lock);
376 D_ASSERT(handle);
377 RemoveTempBlockIndex(lock, index: block_index);
378 }
379
380 bool DeleteIfEmpty() {
381 TemporaryFileLock lock(file_lock);
382 if (index_manager.GetMaxIndex() > 0) {
383 // there are still blocks in this file
384 return false;
385 }
386 // the file is empty: delete it
387 handle.reset();
388 auto &fs = FileSystem::GetFileSystem(db);
389 fs.RemoveFile(filename: path);
390 return true;
391 }
392
393 TemporaryFileInformation GetTemporaryFile() {
394 TemporaryFileLock lock(file_lock);
395 TemporaryFileInformation info;
396 info.path = path;
397 info.size = GetPositionInFile(index: index_manager.GetMaxIndex());
398 return info;
399 }
400
401private:
402 void CreateFileIfNotExists(TemporaryFileLock &) {
403 if (handle) {
404 return;
405 }
406 auto &fs = FileSystem::GetFileSystem(db);
407 handle = fs.OpenFile(path, flags: FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_WRITE |
408 FileFlags::FILE_FLAGS_FILE_CREATE);
409 }
410
411 void RemoveTempBlockIndex(TemporaryFileLock &, idx_t index) {
412 // remove the block index from the index manager
413 if (index_manager.RemoveIndex(index)) {
414 // the max_index that is currently in use has decreased
415 // as a result we can truncate the file
416#ifndef WIN32 // this ended up causing issues when sorting
417 auto max_index = index_manager.GetMaxIndex();
418 auto &fs = FileSystem::GetFileSystem(db);
419 fs.Truncate(handle&: *handle, new_size: GetPositionInFile(index: max_index + 1));
420#endif
421 }
422 }
423
424 idx_t GetPositionInFile(idx_t index) {
425 return index * Storage::BLOCK_ALLOC_SIZE;
426 }
427
428private:
429 DatabaseInstance &db;
430 unique_ptr<FileHandle> handle;
431 idx_t file_index;
432 string path;
433 mutex file_lock;
434 BlockIndexManager index_manager;
435};
436
437class TemporaryFileManager {
438public:
439 TemporaryFileManager(DatabaseInstance &db, const string &temp_directory_p)
440 : db(db), temp_directory(temp_directory_p) {
441 }
442
443public:
444 struct TemporaryManagerLock {
445 explicit TemporaryManagerLock(mutex &mutex) : lock(mutex) {
446 }
447
448 lock_guard<mutex> lock;
449 };
450
451 void WriteTemporaryBuffer(block_id_t block_id, FileBuffer &buffer) {
452 D_ASSERT(buffer.size == Storage::BLOCK_SIZE);
453 TemporaryFileIndex index;
454 TemporaryFileHandle *handle = nullptr;
455
456 {
457 TemporaryManagerLock lock(manager_lock);
458 // first check if we can write to an open existing file
459 for (auto &entry : files) {
460 auto &temp_file = entry.second;
461 index = temp_file->TryGetBlockIndex();
462 if (index.IsValid()) {
463 handle = entry.second.get();
464 break;
465 }
466 }
467 if (!handle) {
468 // no existing handle to write to; we need to create & open a new file
469 auto new_file_index = index_manager.GetNewBlockIndex();
470 auto new_file = make_uniq<TemporaryFileHandle>(args&: db, args&: temp_directory, args&: new_file_index);
471 handle = new_file.get();
472 files[new_file_index] = std::move(new_file);
473
474 index = handle->TryGetBlockIndex();
475 }
476 D_ASSERT(used_blocks.find(block_id) == used_blocks.end());
477 used_blocks[block_id] = index;
478 }
479 D_ASSERT(handle);
480 D_ASSERT(index.IsValid());
481 handle->WriteTemporaryFile(buffer, index);
482 }
483
484 bool HasTemporaryBuffer(block_id_t block_id) {
485 lock_guard<mutex> lock(manager_lock);
486 return used_blocks.find(x: block_id) != used_blocks.end();
487 }
488
489 unique_ptr<FileBuffer> ReadTemporaryBuffer(block_id_t id, unique_ptr<FileBuffer> reusable_buffer) {
490 TemporaryFileIndex index;
491 TemporaryFileHandle *handle;
492 {
493 TemporaryManagerLock lock(manager_lock);
494 index = GetTempBlockIndex(lock, id);
495 handle = GetFileHandle(lock, index: index.file_index);
496 }
497 auto buffer = handle->ReadTemporaryBuffer(id, block_index: index.block_index, reusable_buffer: std::move(reusable_buffer));
498 {
499 // remove the block (and potentially erase the temp file)
500 TemporaryManagerLock lock(manager_lock);
501 EraseUsedBlock(lock, id, handle, index);
502 }
503 return buffer;
504 }
505
506 void DeleteTemporaryBuffer(block_id_t id) {
507 TemporaryManagerLock lock(manager_lock);
508 auto index = GetTempBlockIndex(lock, id);
509 auto handle = GetFileHandle(lock, index: index.file_index);
510 EraseUsedBlock(lock, id, handle, index);
511 }
512
513 vector<TemporaryFileInformation> GetTemporaryFiles() {
514 lock_guard<mutex> lock(manager_lock);
515 vector<TemporaryFileInformation> result;
516 for (auto &file : files) {
517 result.push_back(x: file.second->GetTemporaryFile());
518 }
519 return result;
520 }
521
522private:
523 void EraseUsedBlock(TemporaryManagerLock &lock, block_id_t id, TemporaryFileHandle *handle,
524 TemporaryFileIndex index) {
525 auto entry = used_blocks.find(x: id);
526 if (entry == used_blocks.end()) {
527 throw InternalException("EraseUsedBlock - Block %llu not found in used blocks", id);
528 }
529 used_blocks.erase(position: entry);
530 handle->EraseBlockIndex(block_index: index.block_index);
531 if (handle->DeleteIfEmpty()) {
532 EraseFileHandle(lock, file_index: index.file_index);
533 }
534 }
535
536 TemporaryFileHandle *GetFileHandle(TemporaryManagerLock &, idx_t index) {
537 return files[index].get();
538 }
539
540 TemporaryFileIndex GetTempBlockIndex(TemporaryManagerLock &, block_id_t id) {
541 D_ASSERT(used_blocks.find(id) != used_blocks.end());
542 return used_blocks[id];
543 }
544
545 void EraseFileHandle(TemporaryManagerLock &, idx_t file_index) {
546 files.erase(x: file_index);
547 index_manager.RemoveIndex(index: file_index);
548 }
549
550private:
551 DatabaseInstance &db;
552 mutex manager_lock;
553 //! The temporary directory
554 string temp_directory;
555 //! The set of active temporary file handles
556 unordered_map<idx_t, unique_ptr<TemporaryFileHandle>> files;
557 //! map of block_id -> temporary file position
558 unordered_map<block_id_t, TemporaryFileIndex> used_blocks;
559 //! Manager of in-use temporary file indexes
560 BlockIndexManager index_manager;
561};
562
563TemporaryDirectoryHandle::TemporaryDirectoryHandle(DatabaseInstance &db, string path_p)
564 : db(db), temp_directory(std::move(path_p)), temp_file(make_uniq<TemporaryFileManager>(args&: db, args&: temp_directory)) {
565 auto &fs = FileSystem::GetFileSystem(db);
566 if (!temp_directory.empty()) {
567 if (!fs.DirectoryExists(directory: temp_directory)) {
568 fs.CreateDirectory(directory: temp_directory);
569 created_directory = true;
570 }
571 }
572}
573TemporaryDirectoryHandle::~TemporaryDirectoryHandle() {
574 // first release any temporary files
575 temp_file.reset();
576 // then delete the temporary file directory
577 auto &fs = FileSystem::GetFileSystem(db);
578 if (!temp_directory.empty()) {
579 bool delete_directory = created_directory;
580 vector<string> files_to_delete;
581 if (!created_directory) {
582 bool deleted_everything = true;
583 fs.ListFiles(directory: temp_directory, callback: [&](const string &path, bool isdir) {
584 if (isdir) {
585 deleted_everything = false;
586 return;
587 }
588 if (!StringUtil::StartsWith(str: path, prefix: "duckdb_temp_")) {
589 deleted_everything = false;
590 return;
591 }
592 files_to_delete.push_back(x: path);
593 });
594 }
595 if (delete_directory) {
596 // we want to remove all files in the directory
597 fs.RemoveDirectory(directory: temp_directory);
598 } else {
599 for (auto &file : files_to_delete) {
600 fs.RemoveFile(filename: fs.JoinPath(a: temp_directory, path: file));
601 }
602 }
603 }
604}
605
606TemporaryFileManager &TemporaryDirectoryHandle::GetTempFile() {
607 return *temp_file;
608}
609
610string StandardBufferManager::GetTemporaryPath(block_id_t id) {
611 auto &fs = FileSystem::GetFileSystem(db);
612 return fs.JoinPath(a: temp_directory, path: "duckdb_temp_block-" + to_string(val: id) + ".block");
613}
614
615void StandardBufferManager::RequireTemporaryDirectory() {
616 if (temp_directory.empty()) {
617 throw Exception(
618 "Out-of-memory: cannot write buffer because no temporary directory is specified!\nTo enable "
619 "temporary buffer eviction set a temporary directory using PRAGMA temp_directory='/path/to/tmp.tmp'");
620 }
621 lock_guard<mutex> temp_handle_guard(temp_handle_lock);
622 if (!temp_directory_handle) {
623 // temp directory has not been created yet: initialize it
624 temp_directory_handle = make_uniq<TemporaryDirectoryHandle>(args&: db, args&: temp_directory);
625 }
626}
627
628void StandardBufferManager::WriteTemporaryBuffer(block_id_t block_id, FileBuffer &buffer) {
629 RequireTemporaryDirectory();
630 if (buffer.size == Storage::BLOCK_SIZE) {
631 temp_directory_handle->GetTempFile().WriteTemporaryBuffer(block_id, buffer);
632 return;
633 }
634 // get the path to write to
635 auto path = GetTemporaryPath(id: block_id);
636 D_ASSERT(buffer.size > Storage::BLOCK_SIZE);
637 // create the file and write the size followed by the buffer contents
638 auto &fs = FileSystem::GetFileSystem(db);
639 auto handle = fs.OpenFile(path, flags: FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE);
640 handle->Write(buffer: &buffer.size, nr_bytes: sizeof(idx_t), location: 0);
641 buffer.Write(handle&: *handle, location: sizeof(idx_t));
642}
643
644unique_ptr<FileBuffer> StandardBufferManager::ReadTemporaryBuffer(block_id_t id,
645 unique_ptr<FileBuffer> reusable_buffer) {
646 D_ASSERT(!temp_directory.empty());
647 D_ASSERT(temp_directory_handle.get());
648 if (temp_directory_handle->GetTempFile().HasTemporaryBuffer(block_id: id)) {
649 return temp_directory_handle->GetTempFile().ReadTemporaryBuffer(id, reusable_buffer: std::move(reusable_buffer));
650 }
651 idx_t block_size;
652 // open the temporary file and read the size
653 auto path = GetTemporaryPath(id);
654 auto &fs = FileSystem::GetFileSystem(db);
655 auto handle = fs.OpenFile(path, flags: FileFlags::FILE_FLAGS_READ);
656 handle->Read(buffer: &block_size, nr_bytes: sizeof(idx_t), location: 0);
657
658 // now allocate a buffer of this size and read the data into that buffer
659 auto buffer =
660 ReadTemporaryBufferInternal(buffer_manager&: *this, handle&: *handle, position: sizeof(idx_t), size: block_size, id, reusable_buffer: std::move(reusable_buffer));
661
662 handle.reset();
663 DeleteTemporaryFile(id);
664 return buffer;
665}
666
667void StandardBufferManager::DeleteTemporaryFile(block_id_t id) {
668 if (temp_directory.empty()) {
669 // no temporary directory specified: nothing to delete
670 return;
671 }
672 {
673 lock_guard<mutex> temp_handle_guard(temp_handle_lock);
674 if (!temp_directory_handle) {
675 // temporary directory was not initialized yet: nothing to delete
676 return;
677 }
678 }
679 // check if we should delete the file from the shared pool of files, or from the general file system
680 if (temp_directory_handle->GetTempFile().HasTemporaryBuffer(block_id: id)) {
681 temp_directory_handle->GetTempFile().DeleteTemporaryBuffer(id);
682 return;
683 }
684 auto &fs = FileSystem::GetFileSystem(db);
685 auto path = GetTemporaryPath(id);
686 if (fs.FileExists(filename: path)) {
687 fs.RemoveFile(filename: path);
688 }
689}
690
691bool StandardBufferManager::HasTemporaryDirectory() const {
692 return !temp_directory.empty();
693}
694
695vector<TemporaryFileInformation> StandardBufferManager::GetTemporaryFiles() {
696 vector<TemporaryFileInformation> result;
697 if (temp_directory.empty()) {
698 return result;
699 }
700 {
701 lock_guard<mutex> temp_handle_guard(temp_handle_lock);
702 if (temp_directory_handle) {
703 result = temp_directory_handle->GetTempFile().GetTemporaryFiles();
704 }
705 }
706 auto &fs = FileSystem::GetFileSystem(db);
707 fs.ListFiles(directory: temp_directory, callback: [&](const string &name, bool is_dir) {
708 if (is_dir) {
709 return;
710 }
711 if (!StringUtil::EndsWith(str: name, suffix: ".block")) {
712 return;
713 }
714 TemporaryFileInformation info;
715 info.path = name;
716 auto handle = fs.OpenFile(path: name, flags: FileFlags::FILE_FLAGS_READ);
717 info.size = fs.GetFileSize(handle&: *handle);
718 handle.reset();
719 result.push_back(x: info);
720 });
721 return result;
722}
723
724const char *StandardBufferManager::InMemoryWarning() {
725 if (!temp_directory.empty()) {
726 return "";
727 }
728 return "\nDatabase is launched in in-memory mode and no temporary directory is specified."
729 "\nUnused blocks cannot be offloaded to disk."
730 "\n\nLaunch the database with a persistent storage back-end"
731 "\nOr set PRAGMA temp_directory='/path/to/tmp.tmp'";
732}
733
734void StandardBufferManager::ReserveMemory(idx_t size) {
735 if (size == 0) {
736 return;
737 }
738 auto reservation = EvictBlocksOrThrow(memory_delta: size, buffer: nullptr, args: "failed to reserve memory data of size %s%s",
739 args: StringUtil::BytesToHumanReadableString(bytes: size));
740 reservation.size = 0;
741}
742
743void StandardBufferManager::FreeReservedMemory(idx_t size) {
744 if (size == 0) {
745 return;
746 }
747 buffer_pool.current_memory -= size;
748}
749
750//===--------------------------------------------------------------------===//
751// Buffer Allocator
752//===--------------------------------------------------------------------===//
753data_ptr_t StandardBufferManager::BufferAllocatorAllocate(PrivateAllocatorData *private_data, idx_t size) {
754 auto &data = private_data->Cast<BufferAllocatorData>();
755 auto reservation = data.manager.EvictBlocksOrThrow(memory_delta: size, buffer: nullptr, args: "failed to allocate data of size %s%s",
756 args: StringUtil::BytesToHumanReadableString(bytes: size));
757 // We rely on manual tracking of this one. :(
758 reservation.size = 0;
759 return Allocator::Get(db&: data.manager.db).AllocateData(size);
760}
761
762void StandardBufferManager::BufferAllocatorFree(PrivateAllocatorData *private_data, data_ptr_t pointer, idx_t size) {
763 auto &data = private_data->Cast<BufferAllocatorData>();
764 BufferPoolReservation r(data.manager.GetBufferPool());
765 r.size = size;
766 r.Resize(new_size: 0);
767 return Allocator::Get(db&: data.manager.db).FreeData(pointer, size);
768}
769
770data_ptr_t StandardBufferManager::BufferAllocatorRealloc(PrivateAllocatorData *private_data, data_ptr_t pointer,
771 idx_t old_size, idx_t size) {
772 if (old_size == size) {
773 return pointer;
774 }
775 auto &data = private_data->Cast<BufferAllocatorData>();
776 BufferPoolReservation r(data.manager.GetBufferPool());
777 r.size = old_size;
778 r.Resize(new_size: size);
779 r.size = 0;
780 return Allocator::Get(db&: data.manager.db).ReallocateData(pointer, old_size, new_size: size);
781}
782
783Allocator &BufferAllocator::Get(ClientContext &context) {
784 auto &manager = StandardBufferManager::GetBufferManager(context);
785 return manager.GetBufferAllocator();
786}
787
788Allocator &BufferAllocator::Get(DatabaseInstance &db) {
789 return StandardBufferManager::GetBufferManager(db).GetBufferAllocator();
790}
791
792Allocator &BufferAllocator::Get(AttachedDatabase &db) {
793 return BufferAllocator::Get(db&: db.GetDatabase());
794}
795
796Allocator &StandardBufferManager::GetBufferAllocator() {
797 return buffer_allocator;
798}
799
800} // namespace duckdb
801