1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <atomic> |
7 | #include <cassert> |
8 | #include <cstdint> |
9 | #include <cstring> |
10 | #include <deque> |
11 | #include <thread> |
12 | |
13 | #include "alloc.h" |
14 | #include "light_epoch.h" |
15 | |
16 | namespace FASTER { |
17 | namespace core { |
18 | |
19 | /// The allocator used for the hash table's overflow buckets. |
20 | |
21 | /// Address into a fixed page. |
22 | struct FixedPageAddress { |
23 | static constexpr uint64_t kInvalidAddress = 0; |
24 | |
25 | /// A fixed-page address is 8 bytes. |
26 | /// --of which 48 bits are used for the address. (The remaining 16 bits are used by the hash |
27 | /// table, for control bits and the tag.) |
28 | static constexpr uint64_t kAddressBits = 48; |
29 | static constexpr uint64_t kMaxAddress = ((uint64_t)1 << kAddressBits) - 1; |
30 | |
31 | /// --of which 20 bits are used for offsets into a page, of size 2^20 = 1 million items. |
32 | static constexpr uint64_t kOffsetBits = 20; |
33 | static constexpr uint64_t kMaxOffset = ((uint64_t)1 << kOffsetBits) - 1; |
34 | |
35 | /// --and the remaining 28 bits are used for the page index, allowing for approximately 256 |
36 | /// million pages. |
37 | static constexpr uint64_t kPageBits = kAddressBits - kOffsetBits; |
38 | static constexpr uint64_t kMaxPage = ((uint64_t)1 << kPageBits) - 1; |
39 | |
40 | FixedPageAddress() |
41 | : control_{ 0 } { |
42 | } |
43 | FixedPageAddress(uint64_t control) |
44 | : control_{ control } { |
45 | } |
46 | |
47 | bool operator==(const FixedPageAddress& other) const { |
48 | assert(reserved == 0); |
49 | assert(other.reserved == 0); |
50 | return control_ == other.control_; |
51 | } |
52 | bool operator<(const FixedPageAddress& other) const { |
53 | assert(reserved == 0); |
54 | assert(other.reserved == 0); |
55 | return control_ < other.control_; |
56 | } |
57 | bool operator>(const FixedPageAddress& other) const { |
58 | assert(reserved == 0); |
59 | assert(other.reserved == 0); |
60 | return control_ > other.control_; |
61 | } |
62 | bool operator>=(const FixedPageAddress& other) const { |
63 | assert(reserved == 0); |
64 | assert(other.reserved == 0); |
65 | return control_ >= other.control_; |
66 | } |
67 | FixedPageAddress operator++() { |
68 | return FixedPageAddress{ ++control_ }; |
69 | } |
70 | |
71 | uint32_t offset() const { |
72 | return static_cast<uint32_t>(offset_); |
73 | } |
74 | uint64_t page() const { |
75 | return page_; |
76 | } |
77 | uint64_t control() const { |
78 | return control_; |
79 | } |
80 | |
81 | union { |
82 | struct { |
83 | uint64_t offset_ : kOffsetBits; // 20 bits |
84 | uint64_t page_ : kPageBits; // 28 bits |
85 | uint64_t reserved : 64 - kAddressBits; // 16 bits |
86 | }; |
87 | uint64_t control_; |
88 | }; |
89 | }; |
90 | static_assert(sizeof(FixedPageAddress) == 8, "sizeof(FixedPageAddress) != 8" ); |
91 | |
92 | /// Atomic address into a fixed page. |
93 | class AtomicFixedPageAddress { |
94 | public: |
95 | AtomicFixedPageAddress(const FixedPageAddress& address) |
96 | : control_{ address.control_ } { |
97 | } |
98 | |
99 | /// Atomic access. |
100 | inline FixedPageAddress load() const { |
101 | return FixedPageAddress{ control_.load() }; |
102 | } |
103 | void store(FixedPageAddress value) { |
104 | control_.store(value.control_); |
105 | } |
106 | FixedPageAddress operator++(int) { |
107 | return FixedPageAddress{ control_++ }; |
108 | } |
109 | |
110 | |
111 | private: |
112 | /// Atomic access to the address. |
113 | std::atomic<uint64_t> control_; |
114 | }; |
115 | static_assert(sizeof(AtomicFixedPageAddress) == 8, "sizeof(AtomicFixedPageAddress) != 8" ); |
116 | |
117 | struct FreeAddress { |
118 | FixedPageAddress removed_addr; |
119 | uint64_t removal_epoch; |
120 | }; |
121 | |
122 | template <typename T> |
123 | class FixedPage { |
124 | public: |
125 | typedef T item_t; |
126 | static constexpr uint64_t kPageSize = FixedPageAddress::kMaxOffset + 1; |
127 | |
128 | /// Accessors. |
129 | inline const item_t& element(uint32_t offset) const { |
130 | assert(offset <= FixedPageAddress::kMaxOffset); |
131 | return elements_[offset]; |
132 | } |
133 | inline item_t& element(uint32_t offset) { |
134 | assert(offset <= FixedPageAddress::kMaxOffset); |
135 | return elements_[offset]; |
136 | } |
137 | |
138 | private: |
139 | /// The page's contents. |
140 | item_t elements_[kPageSize]; |
141 | static_assert(alignof(item_t) <= Constants::kCacheLineBytes, |
142 | "alignof(item_t) > Constants::kCacheLineBytes" ); |
143 | }; |
144 | |
145 | template <typename T> |
146 | class FixedPageArray { |
147 | public: |
148 | typedef T item_t; |
149 | typedef FixedPage<T> page_t; |
150 | typedef FixedPageArray<T> array_t; |
151 | |
152 | protected: |
153 | FixedPageArray(uint64_t alignment_, uint64_t size_, const array_t* old_array) |
154 | : alignment{ alignment_ } |
155 | , size{ size_ } { |
156 | assert(Utility::IsPowerOfTwo(size)); |
157 | uint64_t idx = 0; |
158 | if(old_array) { |
159 | assert(old_array->size < size); |
160 | for(; idx < old_array->size; ++idx) { |
161 | page_t* page; |
162 | page = old_array->pages()[idx].load(std::memory_order_acquire); |
163 | while(page == nullptr) { |
164 | std::this_thread::yield(); |
165 | page = old_array->pages()[idx].load(std::memory_order_acquire); |
166 | } |
167 | pages()[idx] = page; |
168 | } |
169 | } |
170 | for(; idx < size; ++idx) { |
171 | pages()[idx] = nullptr; |
172 | } |
173 | } |
174 | |
175 | public: |
176 | static FixedPageArray* Create(uint64_t alignment, uint64_t size, const array_t* old_array) { |
177 | void* buffer = std::malloc(sizeof(array_t) + size * sizeof(std::atomic<page_t*>)); |
178 | return new(buffer) array_t{ alignment, size, old_array }; |
179 | } |
180 | |
181 | static void Delete(array_t* arr, bool owns_pages) { |
182 | assert(arr); |
183 | if(owns_pages) { |
184 | for(uint64_t idx = 0; idx < arr->size; ++idx) { |
185 | page_t* page = arr->pages()[idx].load(std::memory_order_acquire); |
186 | if(page) { |
187 | page->~FixedPage(); |
188 | aligned_free(page); |
189 | } |
190 | } |
191 | } |
192 | arr->~FixedPageArray(); |
193 | std::free(arr); |
194 | } |
195 | |
196 | /// Used by allocator.Get(). |
197 | inline page_t* Get(uint64_t page_idx) { |
198 | assert(page_idx < size); |
199 | return pages()[page_idx].load(std::memory_order_acquire); |
200 | } |
201 | |
202 | /// Used by allocator.Allocate(). |
203 | inline page_t* GetOrAdd(uint64_t page_idx) { |
204 | assert(page_idx < size); |
205 | page_t* page = pages()[page_idx].load(std::memory_order_acquire); |
206 | while(page == nullptr) { |
207 | page = AddPage(page_idx); |
208 | } |
209 | return page; |
210 | } |
211 | |
212 | inline page_t* AddPage(uint64_t page_idx) { |
213 | assert(page_idx < size); |
214 | void* buffer = aligned_alloc(alignment, sizeof(page_t)); |
215 | page_t* new_page = new(buffer) page_t{}; |
216 | page_t* expected = nullptr; |
217 | if(pages()[page_idx].compare_exchange_strong(expected, new_page, std::memory_order_release)) { |
218 | return new_page; |
219 | } else { |
220 | new_page->~page_t(); |
221 | aligned_free(new_page); |
222 | return expected; |
223 | } |
224 | } |
225 | |
226 | private: |
227 | /// Accessors, since zero-length arrays at the ends of structs aren't standard in C++. |
228 | const std::atomic<page_t*>* pages() const { |
229 | return reinterpret_cast<const std::atomic<page_t*>*>(this + 1); |
230 | } |
231 | std::atomic<page_t*>* pages() { |
232 | return reinterpret_cast<std::atomic<page_t*>*>(this + 1); |
233 | } |
234 | |
235 | public: |
236 | /// Alignment at which each page is allocated. |
237 | const uint64_t alignment; |
238 | /// Maximum number of pages in the array; fixed at time of construction. |
239 | const uint64_t size; |
240 | /// Followed by [size] std::atomic<> pointers to (page_t) pages. (Not shown here.) |
241 | }; |
242 | |
243 | class alignas(Constants::kCacheLineBytes) FreeList { |
244 | public: |
245 | std::deque<FreeAddress> free_list; |
246 | }; |
247 | |
248 | template <typename T, class D> |
249 | class MallocFixedPageSize { |
250 | public: |
251 | typedef T item_t; |
252 | typedef D disk_t; |
253 | typedef typename D::file_t file_t; |
254 | typedef FixedPage<T> page_t; |
255 | typedef FixedPageArray<T> array_t; |
256 | typedef MallocFixedPageSize<T, disk_t> alloc_t; |
257 | |
258 | MallocFixedPageSize() |
259 | : alignment_{ UINT64_MAX } |
260 | , count_{ 0 } |
261 | , epoch_{ nullptr } |
262 | , page_array_{ nullptr } |
263 | , disk_{ nullptr } |
264 | , pending_checkpoint_writes_{ 0 } |
265 | , pending_recover_reads_{ 0 } |
266 | , checkpoint_pending_{ false } |
267 | , checkpoint_failed_{ false } |
268 | , recover_pending_{ false } |
269 | , recover_failed_{ false } { |
270 | } |
271 | |
272 | ~MallocFixedPageSize() { |
273 | if(page_array_.load() != nullptr) { |
274 | array_t::Delete(page_array_.load(), true); |
275 | } |
276 | } |
277 | |
278 | inline void Initialize(uint64_t alignment, LightEpoch& epoch) { |
279 | if(page_array_.load() != nullptr) { |
280 | array_t::Delete(page_array_.load(), true); |
281 | } |
282 | alignment_ = alignment; |
283 | count_.store(0); |
284 | epoch_ = &epoch; |
285 | disk_ = nullptr; |
286 | pending_checkpoint_writes_ = 0; |
287 | pending_recover_reads_ = 0; |
288 | checkpoint_pending_ = false; |
289 | checkpoint_failed_ = false; |
290 | recover_pending_ = false; |
291 | recover_failed_ = false; |
292 | |
293 | array_t* page_array = array_t::Create(alignment, 2, nullptr); |
294 | page_array->AddPage(0); |
295 | page_array_.store(page_array, std::memory_order_release); |
296 | // Allocate the null pointer. |
297 | Allocate(); |
298 | } |
299 | |
300 | inline void Uninitialize() { |
301 | if(page_array_.load() != nullptr) { |
302 | array_t::Delete(page_array_.load(), true); |
303 | page_array_.store(nullptr); |
304 | } |
305 | } |
306 | |
307 | inline item_t& Get(FixedPageAddress address) { |
308 | page_t* page = page_array_.load(std::memory_order_acquire)->Get(address.page()); |
309 | assert(page); |
310 | return page->element(address.offset()); |
311 | } |
312 | inline const item_t& Get(FixedPageAddress address) const { |
313 | page_t* page = page_array_.load(std::memory_order_acquire)->Get(address.page()); |
314 | assert(page); |
315 | return page->element(address.offset()); |
316 | } |
317 | |
318 | FixedPageAddress Allocate(); |
319 | |
320 | void FreeAtEpoch(FixedPageAddress addr, uint64_t removed_epoch) { |
321 | free_list().push_back(FreeAddress{ addr, removed_epoch }); |
322 | } |
323 | |
324 | /// Checkpointing and recovery. |
325 | Status Checkpoint(disk_t& disk, file_t&& file, uint64_t& size); |
326 | Status CheckpointComplete(bool wait); |
327 | |
328 | Status Recover(disk_t& disk, file_t&& file, uint64_t file_size, FixedPageAddress count); |
329 | Status RecoverComplete(bool wait); |
330 | |
331 | std::deque<FreeAddress>& free_list() { |
332 | return free_list_[Thread::id()].free_list; |
333 | } |
334 | const std::deque<FreeAddress>& free_list() const { |
335 | return free_list_[Thread::id()].free_list; |
336 | } |
337 | |
338 | FixedPageAddress count() const { |
339 | return count_.load(); |
340 | } |
341 | |
342 | private: |
343 | /// Checkpointing and recovery. |
344 | class AsyncIoContext : public IAsyncContext { |
345 | public: |
346 | AsyncIoContext(alloc_t* allocator_) |
347 | : allocator{ allocator_ } { |
348 | } |
349 | |
350 | /// The deep-copy constructor |
351 | AsyncIoContext(AsyncIoContext& other) |
352 | : allocator{ other.allocator } { |
353 | } |
354 | |
355 | protected: |
356 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
357 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
358 | } |
359 | |
360 | public: |
361 | alloc_t* allocator; |
362 | }; |
363 | |
364 | array_t* ExpandArray(array_t* expected, uint64_t new_size); |
365 | |
366 | private: |
367 | /// Alignment at which each page is allocated. |
368 | uint64_t alignment_; |
369 | /// Array of all of the pages we've allocated. |
370 | std::atomic<array_t*> page_array_; |
371 | /// How many elements we've allocated. |
372 | AtomicFixedPageAddress count_; |
373 | |
374 | LightEpoch* epoch_; |
375 | |
376 | /// State for ongoing checkpoint/recovery. |
377 | disk_t* disk_; |
378 | file_t file_; |
379 | std::atomic<uint64_t> pending_checkpoint_writes_; |
380 | std::atomic<uint64_t> pending_recover_reads_; |
381 | std::atomic<bool> checkpoint_pending_; |
382 | std::atomic<bool> checkpoint_failed_; |
383 | std::atomic<bool> recover_pending_; |
384 | std::atomic<bool> recover_failed_; |
385 | |
386 | FreeList free_list_[Thread::kMaxNumThreads]; |
387 | }; |
388 | |
389 | /// Implementations. |
390 | template <typename T, class F> |
391 | Status MallocFixedPageSize<T, F>::Checkpoint(disk_t& disk, file_t&& file, uint64_t& size) { |
392 | constexpr uint32_t kWriteSize = page_t::kPageSize * sizeof(item_t); |
393 | |
394 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
395 | CallbackContext<AsyncIoContext> context{ ctxt }; |
396 | if(result != Status::Ok) { |
397 | context->allocator->checkpoint_failed_ = true; |
398 | } |
399 | if(--context->allocator->pending_checkpoint_writes_ == 0) { |
400 | result = context->allocator->file_.Close(); |
401 | if(result != Status::Ok) { |
402 | context->allocator->checkpoint_failed_ = true; |
403 | } |
404 | context->allocator->checkpoint_pending_ = false; |
405 | } |
406 | }; |
407 | |
408 | disk_ = &disk; |
409 | file_ = std::move(file); |
410 | size = 0; |
411 | checkpoint_failed_ = false; |
412 | array_t* page_array = page_array_.load(); |
413 | FixedPageAddress count = count_.load(); |
414 | |
415 | uint64_t num_levels = count.page() + (count.offset() > 0 ? 1 : 0); |
416 | assert(!checkpoint_pending_); |
417 | assert(pending_checkpoint_writes_ == 0); |
418 | checkpoint_pending_ = true; |
419 | pending_checkpoint_writes_ = num_levels; |
420 | for(uint64_t idx = 0; idx < num_levels; ++idx) { |
421 | AsyncIoContext context{ this }; |
422 | RETURN_NOT_OK(file_.WriteAsync(page_array->Get(idx), idx * kWriteSize, kWriteSize, callback, |
423 | context)); |
424 | } |
425 | size = count.control_ * sizeof(item_t); |
426 | return Status::Ok; |
427 | } |
428 | |
429 | template <typename T, class F> |
430 | Status MallocFixedPageSize<T, F>::CheckpointComplete(bool wait) { |
431 | disk_->TryComplete(); |
432 | bool complete = !checkpoint_pending_.load(); |
433 | while(wait && !complete) { |
434 | disk_->TryComplete(); |
435 | complete = !checkpoint_pending_.load(); |
436 | std::this_thread::yield(); |
437 | } |
438 | if(!complete) { |
439 | return Status::Pending; |
440 | } else { |
441 | return checkpoint_failed_ ? Status::IOError : Status::Ok; |
442 | } |
443 | } |
444 | |
445 | template <typename T, class F> |
446 | Status MallocFixedPageSize<T, F>::Recover(disk_t& disk, file_t&& file, uint64_t file_size, |
447 | FixedPageAddress count) { |
448 | constexpr uint64_t kReadSize = page_t::kPageSize * sizeof(item_t); |
449 | |
450 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
451 | CallbackContext<AsyncIoContext> context{ ctxt }; |
452 | if(result != Status::Ok) { |
453 | context->allocator->recover_failed_ = true; |
454 | } |
455 | if(--context->allocator->pending_recover_reads_ == 0) { |
456 | result = context->allocator->file_.Close(); |
457 | if(result != Status::Ok) { |
458 | context->allocator->recover_failed_ = true; |
459 | } |
460 | context->allocator->recover_pending_ = false; |
461 | } |
462 | }; |
463 | |
464 | assert(file_size % sizeof(item_t) == 0); |
465 | disk_ = &disk; |
466 | file_ = std::move(file); |
467 | recover_failed_ = false; |
468 | |
469 | // The size reserved by recovery is >= the size checkpointed to disk. |
470 | FixedPageAddress file_end_addr{ file_size / sizeof(item_t) }; |
471 | uint64_t num_file_levels = file_end_addr.page() + (file_end_addr.offset() > 0 ? 1 : 0); |
472 | assert(num_file_levels > 0); |
473 | assert(count >= file_end_addr); |
474 | uint64_t num_levels = count.page() + (count.offset() > 0 ? 1 : 0); |
475 | assert(num_levels > 0); |
476 | |
477 | array_t* page_array = page_array_.load(); |
478 | // Ensure that the allocator has enough pages. |
479 | if(page_array->size < num_levels) { |
480 | uint64_t new_size = next_power_of_two(num_levels); |
481 | page_array = ExpandArray(page_array, new_size); |
482 | } |
483 | count_.store(count); |
484 | assert(!recover_pending_); |
485 | assert(pending_recover_reads_.load() == 0); |
486 | recover_pending_ = true; |
487 | pending_recover_reads_ = num_file_levels; |
488 | for(uint64_t idx = 0; idx < num_file_levels; ++idx) { |
489 | //read a full page |
490 | AsyncIoContext context{ this }; |
491 | RETURN_NOT_OK(file_.ReadAsync(idx * kReadSize, page_array->GetOrAdd(idx), kReadSize, callback, |
492 | context)); |
493 | } |
494 | return Status::Ok; |
495 | } |
496 | |
497 | template <typename T, class F> |
498 | Status MallocFixedPageSize<T, F>::RecoverComplete(bool wait) { |
499 | disk_->TryComplete(); |
500 | bool complete = !recover_pending_.load(); |
501 | while(wait && !complete) { |
502 | disk_->TryComplete(); |
503 | complete = !recover_pending_.load(); |
504 | std::this_thread::yield(); |
505 | } |
506 | if(!complete) { |
507 | return Status::Pending; |
508 | } else { |
509 | return recover_failed_ ? Status::IOError : Status::Ok; |
510 | } |
511 | } |
512 | |
513 | template <typename T, class F> |
514 | FixedPageArray<T>* MallocFixedPageSize<T, F>::ExpandArray(array_t* expected, uint64_t new_size) { |
515 | class Delete_Context : public IAsyncContext { |
516 | public: |
517 | Delete_Context(array_t* arr_) |
518 | : arr{ arr_ } { |
519 | } |
520 | /// The deep-copy constructor. |
521 | Delete_Context(const Delete_Context& other) |
522 | : arr{ other.arr } { |
523 | } |
524 | protected: |
525 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
526 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
527 | } |
528 | public: |
529 | array_t* arr; |
530 | }; |
531 | |
532 | auto delete_callback = [](IAsyncContext* ctxt) { |
533 | CallbackContext<Delete_Context> context{ ctxt }; |
534 | array_t::Delete(context->arr, false); |
535 | }; |
536 | |
537 | assert(Utility::IsPowerOfTwo(new_size)); |
538 | do { |
539 | array_t* new_array = array_t::Create(alignment_, new_size, expected); |
540 | if(page_array_.compare_exchange_strong(expected, new_array, std::memory_order_release)) { |
541 | // Have to free the old array, under epoch protection. |
542 | Delete_Context context{ expected }; |
543 | IAsyncContext* context_copy; |
544 | Status result = context.DeepCopy(context_copy); |
545 | assert(result == Status::Ok); |
546 | epoch_->BumpCurrentEpoch(delete_callback, context_copy); |
547 | return new_array; |
548 | } else { |
549 | new_array->~array_t(); |
550 | std::free(new_array); |
551 | } |
552 | } while(expected->size < new_size); |
553 | return expected; |
554 | } |
555 | |
556 | template <typename T, class F> |
557 | inline FixedPageAddress MallocFixedPageSize<T, F>::Allocate() { |
558 | if(!free_list().empty()) { |
559 | // Check the head of the free list. |
560 | if(free_list().front().removal_epoch <= epoch_->safe_to_reclaim_epoch.load()) { |
561 | FixedPageAddress removed_addr = free_list().front().removed_addr; |
562 | free_list().pop_front(); |
563 | return removed_addr; |
564 | } |
565 | } |
566 | // Determine insertion page_index. |
567 | FixedPageAddress addr = count_++; |
568 | array_t* page_array = page_array_.load(std::memory_order_acquire); |
569 | if(addr.page() >= page_array->size) { |
570 | // Need to resize the page array. |
571 | page_array = ExpandArray(page_array, next_power_of_two(addr.page() + 1)); |
572 | } |
573 | if(addr.offset() == 0 && addr.page() + 1 < page_array->size) { |
574 | // Add the next page early, to try to avoid blocking other threads. |
575 | page_array->AddPage(addr.page() + 1); |
576 | } |
577 | page_array->GetOrAdd(addr.page()); |
578 | return addr; |
579 | } |
580 | |
581 | } |
582 | } // namespace FASTER::core |
583 | |