1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cassert>
8#include <cstdint>
9#include <cstring>
10#include <mutex>
11#include <thread>
12
13#include "device/file_system_disk.h"
14#include "address.h"
15#include "async_result_types.h"
16#include "gc_state.h"
17#include "light_epoch.h"
18#include "native_buffer_pool.h"
19#include "recovery_status.h"
20#include "status.h"
21
22namespace FASTER {
23namespace core {
24
25/// The log allocator, used by FASTER to store records.
26
27enum class FlushStatus : uint8_t {
28 Flushed,
29 InProgress
30};
31
32enum class CloseStatus : uint8_t {
33 Closed,
34 Open
35};
36
37/// Pack flush- and close-status into a single 16-bit value.
38/// State transitions are:
39/// { Flushed, Closed } (default state)
40/// --> { InProgress, Open } (when issuing the flush to disk)
41/// --> either { . , Closed} (when moving the head address forward)
42/// or { Flushed, . } (when the flush completes).
43struct FlushCloseStatus {
44 FlushCloseStatus()
45 : flush{ FlushStatus::Flushed }
46 , close{ CloseStatus::Closed } {
47 }
48
49 FlushCloseStatus(FlushStatus flush_, CloseStatus close_)
50 : flush{ flush_ }
51 , close{ close_ } {
52 }
53
54 FlushCloseStatus(uint16_t control_)
55 : control{ control_ } {
56 }
57
58 /// Is the page ready for use?
59 inline bool Ready() const {
60 return flush == FlushStatus::Flushed && close == CloseStatus::Open;
61 }
62
63 union {
64 struct {
65 FlushStatus flush;
66 CloseStatus close;
67 };
68 uint16_t control;
69 };
70};
71static_assert(sizeof(FlushCloseStatus) == 2, "sizeof(FlushCloseStatus) != 2");
72
73/// Atomic version of FlushCloseStatus. Can set and get flush- and close- status, together,
74/// atomically.
75class AtomicFlushCloseStatus {
76 public:
77 AtomicFlushCloseStatus()
78 : status_{} {
79 }
80
81 inline void store(FlushStatus flush, CloseStatus close) {
82 // Sets flush and close statuses, atomically.
83 FlushCloseStatus status{ flush, close };
84 control_.store(status.control);
85 }
86
87 inline FlushCloseStatus load() const {
88 // Gets flush and close statuses, atomically.
89 return FlushCloseStatus{ control_.load() };
90 }
91
92 inline bool compare_exchange_weak(FlushCloseStatus& expected, FlushCloseStatus value) {
93 uint16_t expected_control = expected.control;
94 bool result = control_.compare_exchange_weak(expected_control, value.control);
95 expected.control = expected_control;
96 return result;
97 }
98 inline bool compare_exchange_strong(FlushCloseStatus& expected, FlushCloseStatus value) {
99 uint16_t expected_control = expected.control;
100 bool result = control_.compare_exchange_strong(expected_control, value.control);
101 expected.control = expected_control;
102 return result;
103 }
104
105 union {
106 FlushCloseStatus status_;
107 std::atomic<uint16_t> control_;
108 };
109};
110static_assert(sizeof(AtomicFlushCloseStatus) == 2, "sizeof(FlushCloseStatus) != 2");
111
112struct FullPageStatus {
113 FullPageStatus()
114 : LastFlushedUntilAddress{ 0 }
115 , status{} {
116 }
117
118 AtomicAddress LastFlushedUntilAddress;
119 AtomicFlushCloseStatus status;
120};
121static_assert(sizeof(FullPageStatus) == 16, "sizeof(FullPageStatus) != 16");
122
123/// Page and offset of the tail of the log. Can reserve space within the current page or move to a
124/// new page.
125class PageOffset {
126 public:
127 PageOffset(uint32_t page, uint64_t offset)
128 : offset_{ offset }
129 , page_{ page } {
130 assert(page <= Address::kMaxPage);
131 }
132
133 PageOffset(uint64_t control)
134 : control_{ control } {
135 }
136
137 PageOffset(const Address& address)
138 : offset_{ address.offset() }
139 , page_{ address.page() } {
140 }
141
142 /// Accessors.
143 inline uint64_t offset() const {
144 return offset_;
145 }
146 inline uint32_t page() const {
147 return static_cast<uint32_t>(page_);
148 }
149 inline uint64_t control() const {
150 return control_;
151 }
152
153 /// Conversion operator.
154 inline operator Address() const {
155 assert(offset_ <= Address::kMaxOffset);
156 return Address{ page(), static_cast<uint32_t>(offset()) };
157 }
158
159 private:
160 /// Use 41 bits for offset, which gives us approximately 2 PB of overflow space, for
161 /// Reserve().
162 union {
163 struct {
164 uint64_t offset_ : 64 - Address::kPageBits;
165 uint64_t page_ : Address::kPageBits;
166 };
167 uint64_t control_;
168 };
169};
170static_assert(sizeof(PageOffset) == 8, "sizeof(PageOffset) != 8");
171
172/// Atomic page + offset marker. Can Reserve() space from current page, or move to NewPage().
173class AtomicPageOffset {
174 public:
175 AtomicPageOffset()
176 : control_{ 0 } {
177 }
178
179 AtomicPageOffset(uint32_t page, uint64_t offset)
180 : control_{ PageOffset{ page, offset } .control() } {
181 }
182
183 AtomicPageOffset(const Address& address) {
184 PageOffset page_offset{ address };
185 control_.store(page_offset.control());
186 }
187
188 /// Reserve space within the current page. Can overflow the page boundary (so result offset >
189 /// Address::kMaxOffset).
190 inline PageOffset Reserve(uint32_t num_slots) {
191 assert(num_slots <= Address::kMaxOffset);
192 PageOffset offset{ 0, num_slots };
193 return PageOffset{ control_.fetch_add(offset.control()) };
194 }
195
196 /// Move to the next page. The compare-and-swap can fail. Returns "true" if some thread advanced
197 /// the thread; sets "won_cas" = "true" if this thread won the CAS, which means it has been
198 /// chosen to set up the new page.
199 inline bool NewPage(uint32_t old_page, bool& won_cas) {
200 assert(old_page < Address::kMaxPage);
201 won_cas = false;
202 PageOffset expected_page_offset = load();
203 if(old_page != expected_page_offset.page()) {
204 // Another thread already moved to the new page.
205 assert(old_page < expected_page_offset.page());
206 return true;
207 }
208 PageOffset new_page{ old_page + 1, 0 };
209 uint64_t expected = expected_page_offset.control();
210 // Try to move to a new page.
211 won_cas = control_.compare_exchange_strong(expected, new_page.control());
212 return PageOffset{ expected } .page() > old_page;
213 }
214
215 inline PageOffset load() const {
216 return PageOffset{ control_.load() };
217 }
218 inline void store(Address address) {
219 PageOffset page_offset{ address.page(), address.offset() };
220 control_.store(page_offset.control());
221 }
222
223 private:
224 union {
225 /// Atomic access to the page+offset.
226 std::atomic<uint64_t> control_;
227 };
228};
229static_assert(sizeof(AtomicPageOffset) == 8, "sizeof(AtomicPageOffset) != 8");
230
231/// The main allocator.
232template <class D>
233class PersistentMemoryMalloc {
234 public:
235 typedef D disk_t;
236 typedef typename D::file_t file_t;
237 typedef typename D::log_file_t log_file_t;
238 typedef PersistentMemoryMalloc<disk_t> alloc_t;
239
240 /// Each page in the buffer is 2^25 bytes (= 32 MB).
241 static constexpr uint64_t kPageSize = Address::kMaxOffset + 1;
242
243 /// The first 4 HLOG pages should be below the head (i.e., being flushed to disk).
244 static constexpr uint32_t kNumHeadPages = 4;
245
246 PersistentMemoryMalloc(uint64_t log_size, LightEpoch& epoch, disk_t& disk_, log_file_t& file_,
247 Address start_address, double log_mutable_fraction)
248 : sector_size{ static_cast<uint32_t>(file_.alignment()) }
249 , epoch_{ &epoch }
250 , disk{ &disk_ }
251 , file{ &file_ }
252 , read_buffer_pool{ 1, sector_size }
253 , io_buffer_pool{ 1, sector_size }
254 , read_only_address{ start_address }
255 , safe_read_only_address{ start_address }
256 , head_address{ start_address }
257 , safe_head_address{ start_address }
258 , flushed_until_address{ start_address }
259 , begin_address{ start_address }
260 , tail_page_offset_{ start_address }
261 , buffer_size_{ 0 }
262 , pages_{ nullptr }
263 , page_status_{ nullptr } {
264 assert(start_address.page() <= Address::kMaxPage);
265
266 if(log_size % kPageSize != 0) {
267 throw std::invalid_argument{ "Log size must be a multiple of 32 MB" };
268 }
269 if(log_size % kPageSize > UINT32_MAX) {
270 throw std::invalid_argument{ "Log size must be <= 128 PB" };
271 }
272 buffer_size_ = static_cast<uint32_t>(log_size / kPageSize);
273
274 if(buffer_size_ <= kNumHeadPages + 1) {
275 throw std::invalid_argument{ "Must have at least 2 non-head pages" };
276 }
277 // The latest N pages should be mutable.
278 num_mutable_pages_ = static_cast<uint32_t>(log_mutable_fraction * buffer_size_);
279 if(num_mutable_pages_ <= 1) {
280 // Need at least two mutable pages: one to write to, and one to open up when the previous
281 // mutable page is full.
282 throw std::invalid_argument{ "Must have at least 2 mutable pages" };
283 }
284
285 pages_ = new uint8_t* [buffer_size_];
286 for(uint32_t idx = 0; idx < buffer_size_; ++idx) {
287 pages_[idx] = nullptr;
288 }
289
290 page_status_ = new FullPageStatus[buffer_size_];
291
292 PageOffset tail_page_offset = tail_page_offset_.load();
293 AllocatePage(tail_page_offset.page());
294 AllocatePage(tail_page_offset.page() + 1);
295 }
296
297 PersistentMemoryMalloc(uint64_t log_size, LightEpoch& epoch, disk_t& disk_, log_file_t& file_,
298 double log_mutable_fraction)
299 : PersistentMemoryMalloc(log_size, epoch, disk_, file_, Address{ 0 }, log_mutable_fraction) {
300 /// Allocate the invalid page. Supports allocations aligned up to kCacheLineBytes.
301 uint32_t discard;
302 Allocate(Constants::kCacheLineBytes, discard);
303 assert(discard == UINT32_MAX);
304 /// Move the head and read-only address past the invalid page.
305 Address tail_address = tail_page_offset_.load();
306 begin_address.store(tail_address);
307 read_only_address.store(tail_address);
308 safe_read_only_address.store(tail_address);
309 head_address.store(tail_address);
310 safe_head_address.store(tail_address);
311 }
312
313 ~PersistentMemoryMalloc() {
314 if(pages_) {
315 for(uint32_t idx = 0; idx < buffer_size_; ++idx) {
316 if(pages_[idx]) {
317 aligned_free(pages_[idx]);
318 }
319 }
320 delete[] pages_;
321 }
322 if(page_status_) {
323 delete[] page_status_;
324 }
325 }
326
327 inline const uint8_t* Page(uint32_t page) const {
328 assert(page <= Address::kMaxPage);
329 return pages_[page % buffer_size_];
330 }
331 inline uint8_t* Page(uint32_t page) {
332 assert(page <= Address::kMaxPage);
333 return pages_[page % buffer_size_];
334 }
335
336 inline const FullPageStatus& PageStatus(uint32_t page) const {
337 assert(page <= Address::kMaxPage);
338 return page_status_[page % buffer_size_];
339 }
340 inline FullPageStatus& PageStatus(uint32_t page) {
341 assert(page <= Address::kMaxPage);
342 return page_status_[page % buffer_size_];
343 }
344
345 inline uint32_t buffer_size() const {
346 return buffer_size_;
347 }
348
349 /// Read the tail page + offset, atomically, and convert it to an address.
350 inline Address GetTailAddress() const {
351 PageOffset tail_page_offset = tail_page_offset_.load();
352 return Address{ tail_page_offset.page(), std::min(Address::kMaxOffset,
353 static_cast<uint32_t>(tail_page_offset.offset())) };
354 }
355
356 inline const uint8_t* Get(Address address) const {
357 return Page(address.page()) + address.offset();
358 }
359 inline uint8_t* Get(Address address) {
360 return Page(address.page()) + address.offset();
361 }
362
363 /// Key function used to allocate memory for a specified number of items. If the current page is
364 /// full, returns Address::kInvalidAddress and sets closed_page to the current page index. The
365 /// caller should Refresh() the epoch and call NewPage() until successful, before trying to
366 /// Allocate() again.
367 inline Address Allocate(uint32_t num_slots, uint32_t& closed_page);
368
369 /// Tries to move the allocator to a new page; used when the current page is full. Returns "true"
370 /// if the page advanced (so the caller can try to allocate, again).
371 inline bool NewPage(uint32_t old_page);
372
373 /// Invoked by users to obtain a record from disk. It uses sector aligned memory to read
374 /// the record efficiently into memory.
375 inline void AsyncGetFromDisk(Address address, uint32_t num_records, AsyncIOCallback callback,
376 AsyncIOContext& context);
377
378 /// Used by applications to make the current state of the database immutable quickly
379 Address ShiftReadOnlyToTail();
380
381 void Truncate(GcState::truncate_callback_t callback);
382
383 /// Action to be performed for when all threads have agreed that a page range is closed.
384 class OnPagesClosed_Context : public IAsyncContext {
385 public:
386 OnPagesClosed_Context(alloc_t* allocator_,
387 Address new_safe_head_address_,
388 bool replace_with_clean_page_)
389 : allocator{ allocator_ }
390 , new_safe_head_address{ new_safe_head_address_ }
391 , replace_with_clean_page{ replace_with_clean_page_ } {
392 }
393
394 /// The deep-copy constructor.
395 OnPagesClosed_Context(const OnPagesClosed_Context& other)
396 : allocator{ other.allocator }
397 , new_safe_head_address{ other.new_safe_head_address }
398 , replace_with_clean_page{ other.replace_with_clean_page } {
399 }
400
401 protected:
402 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
403 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
404 }
405
406 public:
407 alloc_t* allocator;
408 Address new_safe_head_address;
409 bool replace_with_clean_page;
410 };
411
412 static void OnPagesClosed(IAsyncContext* ctxt);
413
414 /// Seal: make sure there are no longer any threads writing to the page
415 /// Flush: send page to secondary store
416 class OnPagesMarkedReadOnly_Context : public IAsyncContext {
417 public:
418 OnPagesMarkedReadOnly_Context(alloc_t* allocator_,
419 Address new_safe_read_only_address_,
420 bool wait_for_pending_flush_complete_)
421 : allocator{ allocator_ }
422 , new_safe_read_only_address{ new_safe_read_only_address_ }
423 , wait_for_pending_flush_complete{ wait_for_pending_flush_complete_ } {
424 }
425
426 /// The deep-copy constructor.
427 OnPagesMarkedReadOnly_Context(const OnPagesMarkedReadOnly_Context& other)
428 : allocator{ other.allocator }
429 , new_safe_read_only_address{ other.new_safe_read_only_address }
430 , wait_for_pending_flush_complete{ other.wait_for_pending_flush_complete } {
431 }
432
433 protected:
434 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
435 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
436 }
437
438 public:
439 alloc_t* allocator;
440 Address new_safe_read_only_address;
441 bool wait_for_pending_flush_complete;
442 };
443
444 static void OnPagesMarkedReadOnly(IAsyncContext* ctxt);
445
446 private:
447 inline void GetFileReadBoundaries(Address read_offset, uint32_t read_length,
448 uint64_t& begin_read, uint64_t& end_read, uint32_t& offset,
449 uint32_t& length) const {
450 assert(sector_size > 0);
451 assert(Utility::IsPowerOfTwo(sector_size));
452 assert(sector_size <= UINT32_MAX);
453 size_t alignment_mask = sector_size - 1;
454 // Align read to sector boundary.
455 begin_read = read_offset.control() & ~alignment_mask;
456 end_read = (read_offset.control() + read_length + alignment_mask) & ~alignment_mask;
457 offset = static_cast<uint32_t>(read_offset.control() & alignment_mask);
458 assert(end_read - begin_read <= UINT32_MAX);
459 length = static_cast<uint32_t>(end_read - begin_read);
460 }
461
462 /// Allocate memory page, in sector aligned form
463 inline void AllocatePage(uint32_t index);
464
465 /// Used by several functions to update the variable to newValue. Ignores if newValue is smaller
466 /// than the current value.
467 template <typename A, typename T>
468 inline bool MonotonicUpdate(A& variable, T new_value,
469 T& old_value) {
470 old_value = variable.load();
471 while(old_value < new_value) {
472 if(variable.compare_exchange_strong(old_value, new_value)) {
473 return true;
474 }
475 }
476 return false;
477 }
478
479 Status AsyncFlushPages(uint32_t start_page, Address until_address,
480 bool serialize_objects = false);
481
482 public:
483 Status AsyncFlushPagesToFile(uint32_t start_page, Address until_address, file_t& file,
484 std::atomic<uint32_t>& flush_pending);
485
486 /// Recovery.
487 Status AsyncReadPagesFromLog(uint32_t start_page, uint32_t num_pages,
488 RecoveryStatus& recovery_status);
489 Status AsyncReadPagesFromSnapshot(file_t& snapshot_file, uint32_t file_start_page,
490 uint32_t start_page, uint32_t num_pages,
491 RecoveryStatus& recovery_status);
492
493 Status AsyncFlushPage(uint32_t page, RecoveryStatus& recovery_status,
494 AsyncCallback caller_callback, IAsyncContext* caller_context);
495 void RecoveryReset(Address begin_address_, Address head_address_, Address tail_address);
496
497 private:
498 template <class F>
499 Status AsyncReadPages(F& read_file, uint32_t file_start_page, uint32_t start_page,
500 uint32_t num_pages, RecoveryStatus& recovery_status);
501 inline void PageAlignedShiftHeadAddress(uint32_t tail_page);
502 inline void PageAlignedShiftReadOnlyAddress(uint32_t tail_page);
503
504 /// Every async flush callback tries to update the flushed until address to the latest value
505 /// possible
506 /// Is there a better way to do this with enabling fine-grained addresses (not necessarily at
507 /// page boundaries)?
508 inline void ShiftFlushedUntilAddress() {
509 Address current_flushed_until_address = flushed_until_address.load();
510 uint32_t page = current_flushed_until_address.page();
511
512 bool update = false;
513 Address page_last_flushed_address = PageStatus(page).LastFlushedUntilAddress.load();
514 while(page_last_flushed_address >= current_flushed_until_address) {
515 current_flushed_until_address = page_last_flushed_address;
516 update = true;
517 ++page;
518 page_last_flushed_address = PageStatus(page).LastFlushedUntilAddress.load();
519 }
520
521 if(update) {
522 Address discard;
523 MonotonicUpdate(flushed_until_address, current_flushed_until_address, discard);
524 }
525 }
526
527 public:
528 uint32_t sector_size;
529
530 private:
531 LightEpoch* epoch_;
532 disk_t* disk;
533
534 public:
535 log_file_t* file;
536 // Read buffer pool
537 NativeSectorAlignedBufferPool read_buffer_pool;
538 NativeSectorAlignedBufferPool io_buffer_pool;
539
540 /// Every address < ReadOnlyAddress is read-only.
541 AtomicAddress read_only_address;
542 /// The minimum ReadOnlyAddress that every thread has seen.
543 AtomicAddress safe_read_only_address;
544
545 /// The circular buffer can drop any page < HeadAddress.page()--must read those pages from disk.
546 AtomicAddress head_address;
547 /// The minimum HeadPage that every thread has seen.
548 AtomicAddress safe_head_address;
549
550 AtomicAddress flushed_until_address;
551
552 /// The address of the true head of the log--everything before this address has been truncated
553 /// by garbage collection.
554 AtomicAddress begin_address;
555
556 private:
557 uint32_t buffer_size_;
558
559 /// -- the latest N pages should be mutable.
560 uint32_t num_mutable_pages_;
561
562 // Circular buffer definition
563 uint8_t** pages_;
564
565 // Array that indicates the status of each buffer page
566 FullPageStatus* page_status_;
567
568 // Global address of the current tail (next element to be allocated from the circular buffer)
569 AtomicPageOffset tail_page_offset_;
570};
571
572/// Implementations.
573template <class D>
574inline void PersistentMemoryMalloc<D>::AllocatePage(uint32_t index) {
575 index = index % buffer_size_;
576 assert(pages_[index] == nullptr);
577 pages_[index] = reinterpret_cast<uint8_t*>(aligned_alloc(sector_size, kPageSize));;
578 std::memset(pages_[index], 0, kPageSize);
579
580 // Mark the page as accessible.
581 page_status_[index].status.store(FlushStatus::Flushed, CloseStatus::Open);
582}
583
584template <class D>
585inline Address PersistentMemoryMalloc<D>::Allocate(uint32_t num_slots, uint32_t& closed_page) {
586 closed_page = UINT32_MAX;
587 PageOffset page_offset = tail_page_offset_.Reserve(num_slots);
588
589 if(page_offset.offset() + num_slots > kPageSize) {
590 // The current page is full. The caller should Refresh() the epoch and wait until
591 // NewPage() is successful before trying to Allocate() again.
592 closed_page = page_offset.page();
593 return Address::kInvalidAddress;
594 } else {
595 assert(Page(page_offset.page()));
596 return static_cast<Address>(page_offset);
597 }
598}
599
600template <class D>
601inline bool PersistentMemoryMalloc<D>::NewPage(uint32_t old_page) {
602 assert(old_page < Address::kMaxPage);
603 PageOffset new_tail_offset{ old_page + 1, 0 };
604 // When the tail advances to page k+1, we clear page k+2.
605 if(old_page + 2 >= safe_head_address.page() + buffer_size_) {
606 // No room in the circular buffer for a new page; try to advance the head address, to make
607 // more room available.
608 disk->TryComplete();
609 PageAlignedShiftReadOnlyAddress(old_page + 1);
610 PageAlignedShiftHeadAddress(old_page + 1);
611 return false;
612 }
613 FlushCloseStatus status = PageStatus(old_page + 1).status.load();
614 if(!status.Ready()) {
615 // Can't access the next page yet; try to advance the head address, to make the page
616 // available.
617 disk->TryComplete();
618 PageAlignedShiftReadOnlyAddress(old_page + 1);
619 PageAlignedShiftHeadAddress(old_page + 1);
620 return false;
621 }
622 bool won_cas;
623 bool retval = tail_page_offset_.NewPage(old_page, won_cas);
624 if(won_cas) {
625 // We moved the tail to (page + 1), so we are responsible for moving the head and
626 // read-only addresses.
627 PageAlignedShiftReadOnlyAddress(old_page + 1);
628 PageAlignedShiftHeadAddress(old_page + 1);
629 if(!Page(old_page + 2)) {
630 // We are also responsible for allocating (page + 2).
631 AllocatePage(old_page + 2);
632 }
633 }
634 return retval;
635}
636
637template <class D>
638inline void PersistentMemoryMalloc<D>::AsyncGetFromDisk(Address address, uint32_t num_records,
639 AsyncIOCallback callback, AsyncIOContext& context) {
640 uint64_t begin_read, end_read;
641 uint32_t offset, length;
642 GetFileReadBoundaries(address, num_records, begin_read, end_read, offset, length);
643 context.record = read_buffer_pool.Get(length);
644 context.record.valid_offset = offset;
645 context.record.available_bytes = length - offset;
646 context.record.required_bytes = num_records;
647
648 file->ReadAsync(begin_read, context.record.buffer(), length, callback, context);
649}
650
651template <class D>
652Address PersistentMemoryMalloc<D>::ShiftReadOnlyToTail() {
653 Address tail_address = GetTailAddress();
654 Address old_read_only_address;
655 if(MonotonicUpdate(read_only_address, tail_address, old_read_only_address)) {
656 OnPagesMarkedReadOnly_Context context{ this, tail_address, false };
657 IAsyncContext* context_copy;
658 Status result = context.DeepCopy(context_copy);
659 assert(result == Status::Ok);
660 epoch_->BumpCurrentEpoch(OnPagesMarkedReadOnly, context_copy);
661 }
662 return tail_address;
663}
664
665template <class D>
666void PersistentMemoryMalloc<D>::Truncate(GcState::truncate_callback_t callback) {
667 assert(sector_size > 0);
668 assert(Utility::IsPowerOfTwo(sector_size));
669 assert(sector_size <= UINT32_MAX);
670 size_t alignment_mask = sector_size - 1;
671 // Align read to sector boundary.
672 uint64_t begin_offset = begin_address.control() & ~alignment_mask;
673 file->Truncate(begin_offset, callback);
674}
675
676template <class D>
677void PersistentMemoryMalloc<D>::OnPagesClosed(IAsyncContext* ctxt) {
678 CallbackContext<OnPagesClosed_Context> context{ ctxt };
679 Address old_safe_head_address;
680 if(context->allocator->MonotonicUpdate(context->allocator->safe_head_address,
681 context->new_safe_head_address,
682 old_safe_head_address)) {
683 for(uint32_t idx = old_safe_head_address.page(); idx < context->new_safe_head_address.page();
684 ++idx) {
685 FlushCloseStatus old_status = context->allocator->PageStatus(idx).status.load();
686 FlushCloseStatus new_status;
687 do {
688 new_status = FlushCloseStatus{ old_status.flush, CloseStatus::Closed };
689 } while(!context->allocator->PageStatus(idx).status.compare_exchange_weak(old_status,
690 new_status));
691
692 if(old_status.flush == FlushStatus::Flushed) {
693 // We closed the page after it was flushed, so we are responsible for clearing and
694 // reopening it.
695 std::memset(context->allocator->Page(idx), 0, kPageSize);
696 context->allocator->PageStatus(idx).status.store(FlushStatus::Flushed, CloseStatus::Open);
697 }
698 }
699 }
700}
701
702template <class D>
703void PersistentMemoryMalloc<D>::OnPagesMarkedReadOnly(IAsyncContext* ctxt) {
704 CallbackContext<OnPagesMarkedReadOnly_Context> context{ ctxt };
705 Address old_safe_read_only_address;
706 if(context->allocator->MonotonicUpdate(context->allocator->safe_read_only_address,
707 context->new_safe_read_only_address,
708 old_safe_read_only_address)) {
709 context->allocator->AsyncFlushPages(old_safe_read_only_address.page(),
710 context->new_safe_read_only_address);
711 }
712}
713
714template <class D>
715Status PersistentMemoryMalloc<D>::AsyncFlushPages(uint32_t start_page, Address until_address,
716 bool serialize_objects) {
717 class Context : public IAsyncContext {
718 public:
719 Context(alloc_t* allocator_, uint32_t page_, Address until_address_)
720 : allocator{ allocator_ }
721 , page{ page_ }
722 , until_address{ until_address_ } {
723 }
724 /// The deep-copy constructor
725 Context(const Context& other)
726 : allocator{ other.allocator }
727 , page{ other.page }
728 , until_address{ other.until_address } {
729 }
730 protected:
731 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
732 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
733 }
734 public:
735 alloc_t* allocator;
736 uint32_t page;
737 Address until_address;
738 };
739
740 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
741 CallbackContext<Context> context{ ctxt };
742 if(result != Status::Ok) {
743 fprintf(stderr, "AsyncFlushPages(), error: %u\n", static_cast<uint8_t>(result));
744 }
745 context->allocator->PageStatus(context->page).LastFlushedUntilAddress.store(
746 context->until_address);
747 //Set the page status to flushed
748 FlushCloseStatus old_status = context->allocator->PageStatus(context->page).status.load();
749 FlushCloseStatus new_status;
750 do {
751 new_status = FlushCloseStatus{ FlushStatus::Flushed, old_status.close };
752 } while(!context->allocator->PageStatus(context->page).status.compare_exchange_weak(
753 old_status, new_status));
754 if(old_status.close == CloseStatus::Closed) {
755 // We finished flushing the page after it was closed, so we are responsible for clearing and
756 // reopening it.
757 std::memset(context->allocator->Page(context->page), 0, kPageSize);
758 context->allocator->PageStatus(context->page).status.store(FlushStatus::Flushed,
759 CloseStatus::Open);
760 }
761 context->allocator->ShiftFlushedUntilAddress();
762 };
763
764 uint32_t num_pages = until_address.page() - start_page;
765 if(until_address.offset() > 0) {
766 ++num_pages;
767 }
768 assert(num_pages > 0);
769
770 for(uint32_t flush_page = start_page; flush_page < start_page + num_pages; ++flush_page) {
771 Address page_start_address{ flush_page, 0 };
772 Address page_end_address{ flush_page + 1, 0 };
773
774 Context context{ this, flush_page, std::min(page_end_address, until_address) };
775
776 //Set status to in-progress
777 FlushCloseStatus old_status = PageStatus(flush_page).status.load();
778 FlushCloseStatus new_status;
779 do {
780 new_status = FlushCloseStatus{ FlushStatus::InProgress, old_status.close };
781 } while(!PageStatus(flush_page).status.compare_exchange_weak(old_status, new_status));
782 PageStatus(flush_page).LastFlushedUntilAddress.store(0);
783
784 RETURN_NOT_OK(file->WriteAsync(Page(flush_page), kPageSize * flush_page, kPageSize, callback,
785 context));
786 }
787 return Status::Ok;
788}
789
790template <class D>
791Status PersistentMemoryMalloc<D>::AsyncFlushPagesToFile(uint32_t start_page, Address until_address,
792 file_t& file, std::atomic<uint32_t>& flush_pending) {
793 class Context : public IAsyncContext {
794 public:
795 Context(std::atomic<uint32_t>& flush_pending_)
796 : flush_pending{ flush_pending_ } {
797 }
798 /// The deep-copy constructor
799 Context(Context& other)
800 : flush_pending{ other.flush_pending } {
801 }
802 protected:
803 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
804 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
805 }
806 public:
807 std::atomic<uint32_t>& flush_pending;
808 };
809
810 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
811 CallbackContext<Context> context{ ctxt };
812 if(result != Status::Ok) {
813 fprintf(stderr, "AsyncFlushPagesToFile(), error: %u\n", static_cast<uint8_t>(result));
814 }
815 assert(context->flush_pending > 0);
816 --context->flush_pending;
817 };
818
819 uint32_t num_pages = until_address.page() - start_page;
820 if(until_address.offset() > 0) {
821 ++num_pages;
822 }
823 assert(num_pages > 0);
824 flush_pending = num_pages;
825
826 for(uint32_t flush_page = start_page; flush_page < start_page + num_pages; ++flush_page) {
827 Address page_start_address{ flush_page, 0 };
828 Address page_end_address{ flush_page + 1, 0 };
829 Context context{ flush_pending };
830 RETURN_NOT_OK(file.WriteAsync(Page(flush_page), kPageSize * (flush_page - start_page),
831 kPageSize, callback, context));
832 }
833 return Status::Ok;
834}
835
836template <class D>
837Status PersistentMemoryMalloc<D>::AsyncReadPagesFromLog(uint32_t start_page, uint32_t num_pages,
838 RecoveryStatus& recovery_status) {
839 return AsyncReadPages(*file, 0, start_page, num_pages, recovery_status);
840}
841
842template <class D>
843Status PersistentMemoryMalloc<D>::AsyncReadPagesFromSnapshot(file_t& snapshot_file,
844 uint32_t file_start_page, uint32_t start_page, uint32_t num_pages,
845 RecoveryStatus& recovery_status) {
846 return AsyncReadPages(snapshot_file, file_start_page, start_page, num_pages, recovery_status);
847}
848
849template <class D>
850template <class F>
851Status PersistentMemoryMalloc<D>::AsyncReadPages(F& read_file, uint32_t file_start_page,
852 uint32_t start_page, uint32_t num_pages, RecoveryStatus& recovery_status) {
853 class Context : public IAsyncContext {
854 public:
855 Context(std::atomic<PageRecoveryStatus>& page_status_)
856 : page_status{ &page_status_ } {
857 }
858 /// The deep-copy constructor
859 Context(const Context& other)
860 : page_status{ other.page_status } {
861 }
862 protected:
863 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
864 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
865 }
866 public:
867 std::atomic<PageRecoveryStatus>* page_status;
868 };
869
870 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
871 CallbackContext<Context> context{ ctxt };
872 if(result != Status::Ok) {
873 fprintf(stderr, "Error: %u\n", static_cast<uint8_t>(result));
874 }
875 assert(context->page_status->load() == PageRecoveryStatus::IssuedRead);
876 context->page_status->store(PageRecoveryStatus::ReadDone);
877 };
878
879 for(uint32_t read_page = start_page; read_page < start_page + num_pages; ++read_page) {
880 if(!Page(read_page)) {
881 // Allocate a new page.
882 AllocatePage(read_page);
883 } else {
884 // Clear an old used page.
885 std::memset(Page(read_page), 0, kPageSize);
886 }
887 assert(recovery_status.page_status(read_page) == PageRecoveryStatus::NotStarted);
888 recovery_status.page_status(read_page).store(PageRecoveryStatus::IssuedRead);
889 PageStatus(read_page).LastFlushedUntilAddress.store(Address{ read_page + 1, 0 });
890 Context context{ recovery_status.page_status(read_page) };
891 RETURN_NOT_OK(read_file.ReadAsync(kPageSize * (read_page - file_start_page), Page(read_page),
892 kPageSize, callback, context));
893 }
894 return Status::Ok;
895}
896
897template <class D>
898Status PersistentMemoryMalloc<D>::AsyncFlushPage(uint32_t page, RecoveryStatus& recovery_status,
899 AsyncCallback caller_callback, IAsyncContext* caller_context) {
900 class Context : public IAsyncContext {
901 public:
902 Context(std::atomic<PageRecoveryStatus>& page_status_, AsyncCallback caller_callback_,
903 IAsyncContext* caller_context_)
904 : page_status{ &page_status_ }
905 , caller_callback{ caller_callback_ }
906 , caller_context{ caller_context_ } {
907 }
908 /// The deep-copy constructor
909 Context(const Context& other, IAsyncContext* caller_context_copy)
910 : page_status{ other.page_status }
911 , caller_callback{ other.caller_callback }
912 , caller_context{ caller_context_copy } {
913 }
914 protected:
915 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
916 if(caller_callback) {
917 return IAsyncContext::DeepCopy_Internal(*this, caller_context, context_copy);
918 } else {
919 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
920 }
921 }
922 public:
923 std::atomic<PageRecoveryStatus>* page_status;
924 AsyncCallback caller_callback;
925 IAsyncContext* caller_context;
926 };
927
928 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
929 CallbackContext<Context> context{ ctxt };
930 if(result != Status::Ok) {
931 fprintf(stderr, "Error: %u\n", static_cast<uint8_t>(result));
932 }
933 assert(context->page_status->load() == PageRecoveryStatus::IssuedFlush);
934 context->page_status->store(PageRecoveryStatus::FlushDone);
935 if(context->caller_callback) {
936 context->caller_callback(context->caller_context, result);
937 }
938 };
939
940 assert(recovery_status.page_status(page) == PageRecoveryStatus::ReadDone);
941 recovery_status.page_status(page).store(PageRecoveryStatus::IssuedFlush);
942 PageStatus(page).LastFlushedUntilAddress.store(Address{ page + 1, 0 });
943 Context context{ recovery_status.page_status(page), caller_callback, caller_context };
944 return file->WriteAsync(Page(page), kPageSize * page, kPageSize, callback, context);
945}
946
947template <class D>
948void PersistentMemoryMalloc<D>::RecoveryReset(Address begin_address_, Address head_address_,
949 Address tail_address) {
950 begin_address.store(begin_address_);
951 tail_page_offset_.store(tail_address);
952 // issue read request to all pages until head lag
953 head_address.store(head_address_);
954 safe_head_address.store(head_address_);
955
956 flushed_until_address.store(Address{ tail_address.page(), 0 });
957 read_only_address.store(tail_address);
958 safe_read_only_address.store(tail_address);
959
960 uint32_t start_page = head_address_.page();
961 uint32_t end_page = tail_address.offset() == 0 ? tail_address.page() : tail_address.page() + 1;
962 if(!Page(end_page)) {
963 AllocatePage(end_page);
964 }
965 if(!Page(end_page + 1)) {
966 AllocatePage(end_page + 1);
967 }
968
969 for(uint32_t idx = 0; idx < buffer_size_; ++idx) {
970 PageStatus(idx).status.store(FlushStatus::Flushed, CloseStatus::Open);
971 }
972}
973
974template <class D>
975inline void PersistentMemoryMalloc<D>::PageAlignedShiftHeadAddress(uint32_t tail_page) {
976 //obtain local values of variables that can change
977 Address current_head_address = head_address.load();
978 Address current_flushed_until_address = flushed_until_address.load();
979
980 if(tail_page <= (buffer_size_ - kNumHeadPages)) {
981 // Desired head address is <= 0.
982 return;
983 }
984
985 Address desired_head_address{ tail_page - (buffer_size_ - kNumHeadPages), 0 };
986
987 if(current_flushed_until_address < desired_head_address) {
988 desired_head_address = Address{ current_flushed_until_address.page(), 0 };
989 }
990
991 Address old_head_address;
992 if(MonotonicUpdate(head_address, desired_head_address, old_head_address)) {
993 OnPagesClosed_Context context{ this, desired_head_address, false };
994 IAsyncContext* context_copy;
995 Status result = context.DeepCopy(context_copy);
996 assert(result == Status::Ok);
997 epoch_->BumpCurrentEpoch(OnPagesClosed, context_copy);
998 }
999}
1000
1001template <class D>
1002inline void PersistentMemoryMalloc<D>::PageAlignedShiftReadOnlyAddress(uint32_t tail_page) {
1003 Address current_read_only_address = read_only_address.load();
1004 if(tail_page <= num_mutable_pages_) {
1005 // Desired read-only address is <= 0.
1006 return;
1007 }
1008
1009 Address desired_read_only_address{ tail_page - num_mutable_pages_, 0 };
1010 Address old_read_only_address;
1011 if(MonotonicUpdate(read_only_address, desired_read_only_address, old_read_only_address)) {
1012 OnPagesMarkedReadOnly_Context context{ this, desired_read_only_address, false };
1013 IAsyncContext* context_copy;
1014 Status result = context.DeepCopy(context_copy);
1015 assert(result == Status::Ok);
1016 epoch_->BumpCurrentEpoch(OnPagesMarkedReadOnly, context_copy);
1017 }
1018}
1019
1020}
1021} // namespace FASTER::core
1022