1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cassert>
8#include <cstdint>
9#include <cstring>
10#include <deque>
11#include <thread>
12
13#include "alloc.h"
14#include "light_epoch.h"
15
16namespace FASTER {
17namespace core {
18
19/// The allocator used for the hash table's overflow buckets.
20
21/// Address into a fixed page.
22struct FixedPageAddress {
23 static constexpr uint64_t kInvalidAddress = 0;
24
25 /// A fixed-page address is 8 bytes.
26 /// --of which 48 bits are used for the address. (The remaining 16 bits are used by the hash
27 /// table, for control bits and the tag.)
28 static constexpr uint64_t kAddressBits = 48;
29 static constexpr uint64_t kMaxAddress = ((uint64_t)1 << kAddressBits) - 1;
30
31 /// --of which 20 bits are used for offsets into a page, of size 2^20 = 1 million items.
32 static constexpr uint64_t kOffsetBits = 20;
33 static constexpr uint64_t kMaxOffset = ((uint64_t)1 << kOffsetBits) - 1;
34
35 /// --and the remaining 28 bits are used for the page index, allowing for approximately 256
36 /// million pages.
37 static constexpr uint64_t kPageBits = kAddressBits - kOffsetBits;
38 static constexpr uint64_t kMaxPage = ((uint64_t)1 << kPageBits) - 1;
39
40 FixedPageAddress()
41 : control_{ 0 } {
42 }
43 FixedPageAddress(uint64_t control)
44 : control_{ control } {
45 }
46
47 bool operator==(const FixedPageAddress& other) const {
48 assert(reserved == 0);
49 assert(other.reserved == 0);
50 return control_ == other.control_;
51 }
52 bool operator<(const FixedPageAddress& other) const {
53 assert(reserved == 0);
54 assert(other.reserved == 0);
55 return control_ < other.control_;
56 }
57 bool operator>(const FixedPageAddress& other) const {
58 assert(reserved == 0);
59 assert(other.reserved == 0);
60 return control_ > other.control_;
61 }
62 bool operator>=(const FixedPageAddress& other) const {
63 assert(reserved == 0);
64 assert(other.reserved == 0);
65 return control_ >= other.control_;
66 }
67 FixedPageAddress operator++() {
68 return FixedPageAddress{ ++control_ };
69 }
70
71 uint32_t offset() const {
72 return static_cast<uint32_t>(offset_);
73 }
74 uint64_t page() const {
75 return page_;
76 }
77 uint64_t control() const {
78 return control_;
79 }
80
81 union {
82 struct {
83 uint64_t offset_ : kOffsetBits; // 20 bits
84 uint64_t page_ : kPageBits; // 28 bits
85 uint64_t reserved : 64 - kAddressBits; // 16 bits
86 };
87 uint64_t control_;
88 };
89};
90static_assert(sizeof(FixedPageAddress) == 8, "sizeof(FixedPageAddress) != 8");
91
92/// Atomic address into a fixed page.
93class AtomicFixedPageAddress {
94 public:
95 AtomicFixedPageAddress(const FixedPageAddress& address)
96 : control_{ address.control_ } {
97 }
98
99 /// Atomic access.
100 inline FixedPageAddress load() const {
101 return FixedPageAddress{ control_.load() };
102 }
103 void store(FixedPageAddress value) {
104 control_.store(value.control_);
105 }
106 FixedPageAddress operator++(int) {
107 return FixedPageAddress{ control_++ };
108 }
109
110
111 private:
112 /// Atomic access to the address.
113 std::atomic<uint64_t> control_;
114};
115static_assert(sizeof(AtomicFixedPageAddress) == 8, "sizeof(AtomicFixedPageAddress) != 8");
116
117struct FreeAddress {
118 FixedPageAddress removed_addr;
119 uint64_t removal_epoch;
120};
121
122template <typename T>
123class FixedPage {
124 public:
125 typedef T item_t;
126 static constexpr uint64_t kPageSize = FixedPageAddress::kMaxOffset + 1;
127
128 /// Accessors.
129 inline const item_t& element(uint32_t offset) const {
130 assert(offset <= FixedPageAddress::kMaxOffset);
131 return elements_[offset];
132 }
133 inline item_t& element(uint32_t offset) {
134 assert(offset <= FixedPageAddress::kMaxOffset);
135 return elements_[offset];
136 }
137
138 private:
139 /// The page's contents.
140 item_t elements_[kPageSize];
141 static_assert(alignof(item_t) <= Constants::kCacheLineBytes,
142 "alignof(item_t) > Constants::kCacheLineBytes");
143};
144
145template <typename T>
146class FixedPageArray {
147 public:
148 typedef T item_t;
149 typedef FixedPage<T> page_t;
150 typedef FixedPageArray<T> array_t;
151
152 protected:
153 FixedPageArray(uint64_t alignment_, uint64_t size_, const array_t* old_array)
154 : alignment{ alignment_ }
155 , size{ size_ } {
156 assert(Utility::IsPowerOfTwo(size));
157 uint64_t idx = 0;
158 if(old_array) {
159 assert(old_array->size < size);
160 for(; idx < old_array->size; ++idx) {
161 page_t* page;
162 page = old_array->pages()[idx].load(std::memory_order_acquire);
163 while(page == nullptr) {
164 std::this_thread::yield();
165 page = old_array->pages()[idx].load(std::memory_order_acquire);
166 }
167 pages()[idx] = page;
168 }
169 }
170 for(; idx < size; ++idx) {
171 pages()[idx] = nullptr;
172 }
173 }
174
175 public:
176 static FixedPageArray* Create(uint64_t alignment, uint64_t size, const array_t* old_array) {
177 void* buffer = std::malloc(sizeof(array_t) + size * sizeof(std::atomic<page_t*>));
178 return new(buffer) array_t{ alignment, size, old_array };
179 }
180
181 static void Delete(array_t* arr, bool owns_pages) {
182 assert(arr);
183 if(owns_pages) {
184 for(uint64_t idx = 0; idx < arr->size; ++idx) {
185 page_t* page = arr->pages()[idx].load(std::memory_order_acquire);
186 if(page) {
187 page->~FixedPage();
188 aligned_free(page);
189 }
190 }
191 }
192 arr->~FixedPageArray();
193 std::free(arr);
194 }
195
196 /// Used by allocator.Get().
197 inline page_t* Get(uint64_t page_idx) {
198 assert(page_idx < size);
199 return pages()[page_idx].load(std::memory_order_acquire);
200 }
201
202 /// Used by allocator.Allocate().
203 inline page_t* GetOrAdd(uint64_t page_idx) {
204 assert(page_idx < size);
205 page_t* page = pages()[page_idx].load(std::memory_order_acquire);
206 while(page == nullptr) {
207 page = AddPage(page_idx);
208 }
209 return page;
210 }
211
212 inline page_t* AddPage(uint64_t page_idx) {
213 assert(page_idx < size);
214 void* buffer = aligned_alloc(alignment, sizeof(page_t));
215 page_t* new_page = new(buffer) page_t{};
216 page_t* expected = nullptr;
217 if(pages()[page_idx].compare_exchange_strong(expected, new_page, std::memory_order_release)) {
218 return new_page;
219 } else {
220 new_page->~page_t();
221 aligned_free(new_page);
222 return expected;
223 }
224 }
225
226 private:
227 /// Accessors, since zero-length arrays at the ends of structs aren't standard in C++.
228 const std::atomic<page_t*>* pages() const {
229 return reinterpret_cast<const std::atomic<page_t*>*>(this + 1);
230 }
231 std::atomic<page_t*>* pages() {
232 return reinterpret_cast<std::atomic<page_t*>*>(this + 1);
233 }
234
235 public:
236 /// Alignment at which each page is allocated.
237 const uint64_t alignment;
238 /// Maximum number of pages in the array; fixed at time of construction.
239 const uint64_t size;
240 /// Followed by [size] std::atomic<> pointers to (page_t) pages. (Not shown here.)
241};
242
243class alignas(Constants::kCacheLineBytes) FreeList {
244 public:
245 std::deque<FreeAddress> free_list;
246};
247
248template <typename T, class D>
249class MallocFixedPageSize {
250 public:
251 typedef T item_t;
252 typedef D disk_t;
253 typedef typename D::file_t file_t;
254 typedef FixedPage<T> page_t;
255 typedef FixedPageArray<T> array_t;
256 typedef MallocFixedPageSize<T, disk_t> alloc_t;
257
258 MallocFixedPageSize()
259 : alignment_{ UINT64_MAX }
260 , count_{ 0 }
261 , epoch_{ nullptr }
262 , page_array_{ nullptr }
263 , disk_{ nullptr }
264 , pending_checkpoint_writes_{ 0 }
265 , pending_recover_reads_{ 0 }
266 , checkpoint_pending_{ false }
267 , checkpoint_failed_{ false }
268 , recover_pending_{ false }
269 , recover_failed_{ false } {
270 }
271
272 ~MallocFixedPageSize() {
273 if(page_array_.load() != nullptr) {
274 array_t::Delete(page_array_.load(), true);
275 }
276 }
277
278 inline void Initialize(uint64_t alignment, LightEpoch& epoch) {
279 if(page_array_.load() != nullptr) {
280 array_t::Delete(page_array_.load(), true);
281 }
282 alignment_ = alignment;
283 count_.store(0);
284 epoch_ = &epoch;
285 disk_ = nullptr;
286 pending_checkpoint_writes_ = 0;
287 pending_recover_reads_ = 0;
288 checkpoint_pending_ = false;
289 checkpoint_failed_ = false;
290 recover_pending_ = false;
291 recover_failed_ = false;
292
293 array_t* page_array = array_t::Create(alignment, 2, nullptr);
294 page_array->AddPage(0);
295 page_array_.store(page_array, std::memory_order_release);
296 // Allocate the null pointer.
297 Allocate();
298 }
299
300 inline void Uninitialize() {
301 if(page_array_.load() != nullptr) {
302 array_t::Delete(page_array_.load(), true);
303 page_array_.store(nullptr);
304 }
305 }
306
307 inline item_t& Get(FixedPageAddress address) {
308 page_t* page = page_array_.load(std::memory_order_acquire)->Get(address.page());
309 assert(page);
310 return page->element(address.offset());
311 }
312 inline const item_t& Get(FixedPageAddress address) const {
313 page_t* page = page_array_.load(std::memory_order_acquire)->Get(address.page());
314 assert(page);
315 return page->element(address.offset());
316 }
317
318 FixedPageAddress Allocate();
319
320 void FreeAtEpoch(FixedPageAddress addr, uint64_t removed_epoch) {
321 free_list().push_back(FreeAddress{ addr, removed_epoch });
322 }
323
324 /// Checkpointing and recovery.
325 Status Checkpoint(disk_t& disk, file_t&& file, uint64_t& size);
326 Status CheckpointComplete(bool wait);
327
328 Status Recover(disk_t& disk, file_t&& file, uint64_t file_size, FixedPageAddress count);
329 Status RecoverComplete(bool wait);
330
331 std::deque<FreeAddress>& free_list() {
332 return free_list_[Thread::id()].free_list;
333 }
334 const std::deque<FreeAddress>& free_list() const {
335 return free_list_[Thread::id()].free_list;
336 }
337
338 FixedPageAddress count() const {
339 return count_.load();
340 }
341
342 private:
343 /// Checkpointing and recovery.
344 class AsyncIoContext : public IAsyncContext {
345 public:
346 AsyncIoContext(alloc_t* allocator_)
347 : allocator{ allocator_ } {
348 }
349
350 /// The deep-copy constructor
351 AsyncIoContext(AsyncIoContext& other)
352 : allocator{ other.allocator } {
353 }
354
355 protected:
356 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
357 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
358 }
359
360 public:
361 alloc_t* allocator;
362 };
363
364 array_t* ExpandArray(array_t* expected, uint64_t new_size);
365
366 private:
367 /// Alignment at which each page is allocated.
368 uint64_t alignment_;
369 /// Array of all of the pages we've allocated.
370 std::atomic<array_t*> page_array_;
371 /// How many elements we've allocated.
372 AtomicFixedPageAddress count_;
373
374 LightEpoch* epoch_;
375
376 /// State for ongoing checkpoint/recovery.
377 disk_t* disk_;
378 file_t file_;
379 std::atomic<uint64_t> pending_checkpoint_writes_;
380 std::atomic<uint64_t> pending_recover_reads_;
381 std::atomic<bool> checkpoint_pending_;
382 std::atomic<bool> checkpoint_failed_;
383 std::atomic<bool> recover_pending_;
384 std::atomic<bool> recover_failed_;
385
386 FreeList free_list_[Thread::kMaxNumThreads];
387};
388
389/// Implementations.
390template <typename T, class F>
391Status MallocFixedPageSize<T, F>::Checkpoint(disk_t& disk, file_t&& file, uint64_t& size) {
392 constexpr uint32_t kWriteSize = page_t::kPageSize * sizeof(item_t);
393
394 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
395 CallbackContext<AsyncIoContext> context{ ctxt };
396 if(result != Status::Ok) {
397 context->allocator->checkpoint_failed_ = true;
398 }
399 if(--context->allocator->pending_checkpoint_writes_ == 0) {
400 result = context->allocator->file_.Close();
401 if(result != Status::Ok) {
402 context->allocator->checkpoint_failed_ = true;
403 }
404 context->allocator->checkpoint_pending_ = false;
405 }
406 };
407
408 disk_ = &disk;
409 file_ = std::move(file);
410 size = 0;
411 checkpoint_failed_ = false;
412 array_t* page_array = page_array_.load();
413 FixedPageAddress count = count_.load();
414
415 uint64_t num_levels = count.page() + (count.offset() > 0 ? 1 : 0);
416 assert(!checkpoint_pending_);
417 assert(pending_checkpoint_writes_ == 0);
418 checkpoint_pending_ = true;
419 pending_checkpoint_writes_ = num_levels;
420 for(uint64_t idx = 0; idx < num_levels; ++idx) {
421 AsyncIoContext context{ this };
422 RETURN_NOT_OK(file_.WriteAsync(page_array->Get(idx), idx * kWriteSize, kWriteSize, callback,
423 context));
424 }
425 size = count.control_ * sizeof(item_t);
426 return Status::Ok;
427}
428
429template <typename T, class F>
430Status MallocFixedPageSize<T, F>::CheckpointComplete(bool wait) {
431 disk_->TryComplete();
432 bool complete = !checkpoint_pending_.load();
433 while(wait && !complete) {
434 disk_->TryComplete();
435 complete = !checkpoint_pending_.load();
436 std::this_thread::yield();
437 }
438 if(!complete) {
439 return Status::Pending;
440 } else {
441 return checkpoint_failed_ ? Status::IOError : Status::Ok;
442 }
443}
444
445template <typename T, class F>
446Status MallocFixedPageSize<T, F>::Recover(disk_t& disk, file_t&& file, uint64_t file_size,
447 FixedPageAddress count) {
448 constexpr uint64_t kReadSize = page_t::kPageSize * sizeof(item_t);
449
450 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
451 CallbackContext<AsyncIoContext> context{ ctxt };
452 if(result != Status::Ok) {
453 context->allocator->recover_failed_ = true;
454 }
455 if(--context->allocator->pending_recover_reads_ == 0) {
456 result = context->allocator->file_.Close();
457 if(result != Status::Ok) {
458 context->allocator->recover_failed_ = true;
459 }
460 context->allocator->recover_pending_ = false;
461 }
462 };
463
464 assert(file_size % sizeof(item_t) == 0);
465 disk_ = &disk;
466 file_ = std::move(file);
467 recover_failed_ = false;
468
469 // The size reserved by recovery is >= the size checkpointed to disk.
470 FixedPageAddress file_end_addr{ file_size / sizeof(item_t) };
471 uint64_t num_file_levels = file_end_addr.page() + (file_end_addr.offset() > 0 ? 1 : 0);
472 assert(num_file_levels > 0);
473 assert(count >= file_end_addr);
474 uint64_t num_levels = count.page() + (count.offset() > 0 ? 1 : 0);
475 assert(num_levels > 0);
476
477 array_t* page_array = page_array_.load();
478 // Ensure that the allocator has enough pages.
479 if(page_array->size < num_levels) {
480 uint64_t new_size = next_power_of_two(num_levels);
481 page_array = ExpandArray(page_array, new_size);
482 }
483 count_.store(count);
484 assert(!recover_pending_);
485 assert(pending_recover_reads_.load() == 0);
486 recover_pending_ = true;
487 pending_recover_reads_ = num_file_levels;
488 for(uint64_t idx = 0; idx < num_file_levels; ++idx) {
489 //read a full page
490 AsyncIoContext context{ this };
491 RETURN_NOT_OK(file_.ReadAsync(idx * kReadSize, page_array->GetOrAdd(idx), kReadSize, callback,
492 context));
493 }
494 return Status::Ok;
495}
496
497template <typename T, class F>
498Status MallocFixedPageSize<T, F>::RecoverComplete(bool wait) {
499 disk_->TryComplete();
500 bool complete = !recover_pending_.load();
501 while(wait && !complete) {
502 disk_->TryComplete();
503 complete = !recover_pending_.load();
504 std::this_thread::yield();
505 }
506 if(!complete) {
507 return Status::Pending;
508 } else {
509 return recover_failed_ ? Status::IOError : Status::Ok;
510 }
511}
512
513template <typename T, class F>
514FixedPageArray<T>* MallocFixedPageSize<T, F>::ExpandArray(array_t* expected, uint64_t new_size) {
515 class Delete_Context : public IAsyncContext {
516 public:
517 Delete_Context(array_t* arr_)
518 : arr{ arr_ } {
519 }
520 /// The deep-copy constructor.
521 Delete_Context(const Delete_Context& other)
522 : arr{ other.arr } {
523 }
524 protected:
525 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
526 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
527 }
528 public:
529 array_t* arr;
530 };
531
532 auto delete_callback = [](IAsyncContext* ctxt) {
533 CallbackContext<Delete_Context> context{ ctxt };
534 array_t::Delete(context->arr, false);
535 };
536
537 assert(Utility::IsPowerOfTwo(new_size));
538 do {
539 array_t* new_array = array_t::Create(alignment_, new_size, expected);
540 if(page_array_.compare_exchange_strong(expected, new_array, std::memory_order_release)) {
541 // Have to free the old array, under epoch protection.
542 Delete_Context context{ expected };
543 IAsyncContext* context_copy;
544 Status result = context.DeepCopy(context_copy);
545 assert(result == Status::Ok);
546 epoch_->BumpCurrentEpoch(delete_callback, context_copy);
547 return new_array;
548 } else {
549 new_array->~array_t();
550 std::free(new_array);
551 }
552 } while(expected->size < new_size);
553 return expected;
554}
555
556template <typename T, class F>
557inline FixedPageAddress MallocFixedPageSize<T, F>::Allocate() {
558 if(!free_list().empty()) {
559 // Check the head of the free list.
560 if(free_list().front().removal_epoch <= epoch_->safe_to_reclaim_epoch.load()) {
561 FixedPageAddress removed_addr = free_list().front().removed_addr;
562 free_list().pop_front();
563 return removed_addr;
564 }
565 }
566 // Determine insertion page_index.
567 FixedPageAddress addr = count_++;
568 array_t* page_array = page_array_.load(std::memory_order_acquire);
569 if(addr.page() >= page_array->size) {
570 // Need to resize the page array.
571 page_array = ExpandArray(page_array, next_power_of_two(addr.page() + 1));
572 }
573 if(addr.offset() == 0 && addr.page() + 1 < page_array->size) {
574 // Add the next page early, to try to avoid blocking other threads.
575 page_array->AddPage(addr.page() + 1);
576 }
577 page_array->GetOrAdd(addr.page());
578 return addr;
579}
580
581}
582} // namespace FASTER::core
583