1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cassert>
8#include <cinttypes>
9#include <cstdint>
10#include <cstdio>
11#include <cstring>
12#include <type_traits>
13
14#include "device/file_system_disk.h"
15
16#include "alloc.h"
17#include "checkpoint_locks.h"
18#include "checkpoint_state.h"
19#include "constants.h"
20#include "gc_state.h"
21#include "grow_state.h"
22#include "guid.h"
23#include "hash_table.h"
24#include "internal_contexts.h"
25#include "key_hash.h"
26#include "malloc_fixed_page_size.h"
27#include "persistent_memory_malloc.h"
28#include "record.h"
29#include "recovery_status.h"
30#include "state_transitions.h"
31#include "status.h"
32#include "utility.h"
33
34using namespace std::chrono_literals;
35
36/// The FASTER key-value store, and related classes.
37
38namespace FASTER {
39namespace core {
40
41class alignas(Constants::kCacheLineBytes) ThreadContext {
42 public:
43 ThreadContext()
44 : contexts_{}
45 , cur_{ 0 } {
46 }
47
48 inline const ExecutionContext& cur() const {
49 return contexts_[cur_];
50 }
51 inline ExecutionContext& cur() {
52 return contexts_[cur_];
53 }
54
55 inline const ExecutionContext& prev() const {
56 return contexts_[(cur_ + 1) % 2];
57 }
58 inline ExecutionContext& prev() {
59 return contexts_[(cur_ + 1) % 2];
60 }
61
62 inline void swap() {
63 cur_ = (cur_ + 1) % 2;
64 }
65
66 private:
67 ExecutionContext contexts_[2];
68 uint8_t cur_;
69};
70static_assert(sizeof(ThreadContext) == 448, "sizeof(ThreadContext) != 448");
71
72/// The FASTER key-value store.
73template <class K, class V, class D>
74class FasterKv {
75 public:
76 typedef FasterKv<K, V, D> faster_t;
77
78 /// Keys and values stored in this key-value store.
79 typedef K key_t;
80 typedef V value_t;
81
82 typedef D disk_t;
83 typedef typename D::file_t file_t;
84 typedef typename D::log_file_t log_file_t;
85
86 typedef PersistentMemoryMalloc<disk_t> hlog_t;
87
88 /// Contexts that have been deep-copied, for async continuations, and must be accessed via
89 /// virtual function calls.
90 typedef AsyncPendingReadContext<key_t> async_pending_read_context_t;
91 typedef AsyncPendingUpsertContext<key_t> async_pending_upsert_context_t;
92 typedef AsyncPendingRmwContext<key_t> async_pending_rmw_context_t;
93
94 FasterKv(uint64_t table_size, uint64_t log_size, const std::string& filename,
95 double log_mutable_fraction = 0.9)
96 : min_table_size_{ table_size }
97 , disk{ filename, epoch_ }
98 , hlog{ log_size, epoch_, disk, disk.log(), log_mutable_fraction }
99 , system_state_{ Action::None, Phase::REST, 1 }
100 , num_pending_ios{ 0 } {
101 if(!Utility::IsPowerOfTwo(table_size)) {
102 throw std::invalid_argument{ " Size is not a power of 2" };
103 }
104 if(table_size > INT32_MAX) {
105 throw std::invalid_argument{ " Cannot allocate such a large hash table " };
106 }
107
108 resize_info_.version = 0;
109 state_[0].Initialize(table_size, disk.log().alignment());
110 overflow_buckets_allocator_[0].Initialize(disk.log().alignment(), epoch_);
111 }
112
113 // No copy constructor.
114 FasterKv(const FasterKv& other) = delete;
115
116 public:
117 /// Thread-related operations
118 Guid StartSession();
119 uint64_t ContinueSession(const Guid& guid);
120 void StopSession();
121 void Refresh();
122
123 /// Store interface
124 template <class RC>
125 inline Status Read(RC& context, AsyncCallback callback, uint64_t monotonic_serial_num);
126
127 template <class UC>
128 inline Status Upsert(UC& context, AsyncCallback callback, uint64_t monotonic_serial_num);
129
130 template <class MC>
131 inline Status Rmw(MC& context, AsyncCallback callback, uint64_t monotonic_serial_num);
132 /// Delete() not yet implemented!
133 // void Delete(const Key& key, Context& context, uint64_t lsn);
134 inline bool CompletePending(bool wait = false);
135
136 /// Checkpoint/recovery operations.
137 bool Checkpoint(void(*index_persistence_callback)(Status result),
138 void(*hybrid_log_persistence_callback)(Status result,
139 uint64_t persistent_serial_num), Guid& token);
140 bool CheckpointIndex(void(*index_persistence_callback)(Status result), Guid& token);
141 bool CheckpointHybridLog(void(*hybrid_log_persistence_callback)(Status result,
142 uint64_t persistent_serial_num), Guid& token);
143 Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version,
144 std::vector<Guid>& session_ids);
145
146 /// Truncating the head of the log.
147 bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback,
148 GcState::complete_callback_t complete_callback);
149
150 /// Make the hash table larger.
151 bool GrowIndex(GrowState::callback_t caller_callback);
152
153 /// Statistics
154 inline uint64_t Size() const {
155 return hlog.GetTailAddress().control();
156 }
157 inline void DumpDistribution() {
158 state_[resize_info_.version].DumpDistribution(
159 overflow_buckets_allocator_[resize_info_.version]);
160 }
161
162 private:
163 typedef Record<key_t, value_t> record_t;
164
165 typedef PendingContext<key_t> pending_context_t;
166
167 template <class C>
168 inline OperationStatus InternalRead(C& pending_context) const;
169
170 template <class C>
171 inline OperationStatus InternalUpsert(C& pending_context);
172
173 template <class C>
174 inline OperationStatus InternalRmw(C& pending_context, bool retrying);
175
176 inline OperationStatus InternalRetryPendingRmw(async_pending_rmw_context_t& pending_context);
177
178 OperationStatus InternalContinuePendingRead(ExecutionContext& ctx,
179 AsyncIOContext& io_context);
180 OperationStatus InternalContinuePendingRmw(ExecutionContext& ctx,
181 AsyncIOContext& io_context);
182
183 // Find the hash bucket entry, if any, corresponding to the specified hash.
184 inline const AtomicHashBucketEntry* FindEntry(KeyHash hash) const;
185 // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
186 // create a new entry. The caller can use the "expected_entry" to CAS its desired address into
187 // the entry.
188 inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry,
189 HashBucket*& bucket);
190 inline Address TraceBackForKeyMatch(const key_t& key, Address from_address,
191 Address min_offset) const;
192 Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address,
193 Address min_address, uint8_t side);
194
195 // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise,
196 // return an unused bucket entry.
197 inline AtomicHashBucketEntry* FindTentativeEntry(KeyHash hash, HashBucket* bucket,
198 uint8_t version, HashBucketEntry& expected_entry);
199 // Looks for an entry that has the same
200 inline bool HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version,
201 const AtomicHashBucketEntry* atomic_entry) const;
202
203 inline Address BlockAllocate(uint32_t record_size);
204
205 inline Status HandleOperationStatus(ExecutionContext& ctx,
206 pending_context_t& pending_context,
207 OperationStatus internal_status, bool& async);
208 inline Status PivotAndRetry(ExecutionContext& ctx, pending_context_t& pending_context,
209 bool& async);
210 inline Status RetryLater(ExecutionContext& ctx, pending_context_t& pending_context,
211 bool& async);
212 inline constexpr uint32_t MinIoRequestSize() const;
213 inline Status IssueAsyncIoRequest(ExecutionContext& ctx, pending_context_t& pending_context,
214 bool& async);
215
216 void AsyncGetFromDisk(Address address, uint32_t num_records, AsyncIOCallback callback,
217 AsyncIOContext& context);
218 static void AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
219 size_t bytes_transferred);
220
221 void CompleteIoPendingRequests(ExecutionContext& context);
222 void CompleteRetryRequests(ExecutionContext& context);
223
224 void InitializeCheckpointLocks();
225
226 /// Checkpoint/recovery methods.
227 void HandleSpecialPhases();
228 bool GlobalMoveToNextState(SystemState current_state);
229
230 Status CheckpointFuzzyIndex();
231 Status CheckpointFuzzyIndexComplete();
232 Status RecoverFuzzyIndex();
233 Status RecoverFuzzyIndexComplete(bool wait);
234
235 Status WriteIndexMetadata();
236 Status ReadIndexMetadata(const Guid& token);
237 Status WriteCprMetadata();
238 Status ReadCprMetadata(const Guid& token);
239 Status WriteCprContext();
240 Status ReadCprContexts(const Guid& token, const Guid* guids);
241
242 Status RecoverHybridLog();
243 Status RecoverHybridLogFromSnapshotFile();
244 Status RecoverFromPage(Address from_address, Address to_address);
245 Status RestoreHybridLog();
246
247 void MarkAllPendingRequests();
248
249 inline void HeavyEnter();
250 bool CleanHashTableBuckets();
251 void SplitHashTableBuckets();
252 void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version,
253 HashBucketEntry entry);
254
255 /// Access the current and previous (thread-local) execution contexts.
256 const ExecutionContext& thread_ctx() const {
257 return thread_contexts_[Thread::id()].cur();
258 }
259 ExecutionContext& thread_ctx() {
260 return thread_contexts_[Thread::id()].cur();
261 }
262 ExecutionContext& prev_thread_ctx() {
263 return thread_contexts_[Thread::id()].prev();
264 }
265
266 private:
267 LightEpoch epoch_;
268
269 public:
270 disk_t disk;
271 hlog_t hlog;
272
273 private:
274 static constexpr bool kCopyReadsToTail = false;
275 static constexpr uint64_t kGcHashTableChunkSize = 16384;
276 static constexpr uint64_t kGrowHashTableChunkSize = 16384;
277
278 bool fold_over_snapshot = true;
279
280 /// Initial size of the table
281 uint64_t min_table_size_;
282
283 // Allocator for the hash buckets that don't fit in the hash table.
284 MallocFixedPageSize<HashBucket, disk_t> overflow_buckets_allocator_[2];
285
286 // An array of size two, that contains the old and new versions of the hash-table
287 InternalHashTable<disk_t> state_[2];
288
289 CheckpointLocks checkpoint_locks_;
290
291 ResizeInfo resize_info_;
292
293 AtomicSystemState system_state_;
294
295 /// Checkpoint/recovery state.
296 CheckpointState<file_t> checkpoint_;
297 /// Garbage collection state.
298 GcState gc_;
299 /// Grow (hash table) state.
300 GrowState grow_;
301
302 /// Global count of pending I/Os, used for throttling.
303 std::atomic<uint64_t> num_pending_ios;
304
305 /// Space for two contexts per thread, stored inline.
306 ThreadContext thread_contexts_[Thread::kMaxNumThreads];
307};
308
309// Implementations.
310template <class K, class V, class D>
311inline Guid FasterKv<K, V, D>::StartSession() {
312 SystemState state = system_state_.load();
313 if(state.phase != Phase::REST) {
314 throw std::runtime_error{ "Can acquire only in REST phase!" };
315 }
316 thread_ctx().Initialize(state.phase, state.version, Guid::Create(), 0);
317 Refresh();
318 return thread_ctx().guid;
319}
320
321template <class K, class V, class D>
322inline uint64_t FasterKv<K, V, D>::ContinueSession(const Guid& session_id) {
323 auto iter = checkpoint_.continue_tokens.find(session_id);
324 if(iter == checkpoint_.continue_tokens.end()) {
325 throw std::invalid_argument{ "Unknown session ID" };
326 }
327
328 SystemState state = system_state_.load();
329 if(state.phase != Phase::REST) {
330 throw std::runtime_error{ "Can continue only in REST phase!" };
331 }
332 thread_ctx().Initialize(state.phase, state.version, session_id, iter->second);
333 Refresh();
334 return iter->second;
335}
336
337template <class K, class V, class D>
338inline void FasterKv<K, V, D>::Refresh() {
339 epoch_.ProtectAndDrain();
340 // We check if we are in normal mode
341 SystemState new_state = system_state_.load();
342 if(thread_ctx().phase == Phase::REST && new_state.phase == Phase::REST) {
343 return;
344 }
345 HandleSpecialPhases();
346}
347
348template <class K, class V, class D>
349inline void FasterKv<K, V, D>::StopSession() {
350 // If this thread is still involved in some activity, wait until it finishes.
351 while(thread_ctx().phase != Phase::REST ||
352 !thread_ctx().pending_ios.empty() ||
353 !thread_ctx().retry_requests.empty()) {
354 CompletePending(false);
355 std::this_thread::yield();
356 }
357
358 assert(thread_ctx().retry_requests.empty());
359 assert(thread_ctx().pending_ios.empty());
360 assert(thread_ctx().io_responses.empty());
361
362 assert(prev_thread_ctx().retry_requests.empty());
363 assert(prev_thread_ctx().pending_ios.empty());
364 assert(prev_thread_ctx().io_responses.empty());
365
366 assert(thread_ctx().phase == Phase::REST);
367
368 epoch_.Unprotect();
369}
370
371template <class K, class V, class D>
372inline const AtomicHashBucketEntry* FasterKv<K, V, D>::FindEntry(KeyHash hash) const {
373 // Truncate the hash to get a bucket page_index < state[version].size.
374 uint32_t version = resize_info_.version;
375 const HashBucket* bucket = &state_[version].bucket(hash);
376 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
377
378 while(true) {
379 // Search through the bucket looking for our key. Last entry is reserved
380 // for the overflow pointer.
381 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
382 HashBucketEntry entry = bucket->entries[entry_idx].load();
383 if(entry.unused()) {
384 continue;
385 }
386 if(hash.tag() == entry.tag()) {
387 // Found a matching tag. (So, the input hash matches the entry on 14 tag bits +
388 // log_2(table size) address bits.)
389 if(!entry.tentative()) {
390 // If (final key, return immediately)
391 return &bucket->entries[entry_idx];
392 }
393 }
394 }
395
396 // Go to next bucket in the chain
397 HashBucketOverflowEntry entry = bucket->overflow_entry.load();
398 if(entry.unused()) {
399 // No more buckets in the chain.
400 return nullptr;
401 }
402 bucket = &overflow_buckets_allocator_[version].Get(entry.address());
403 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
404 }
405 assert(false);
406 return nullptr; // NOT REACHED
407}
408
409template <class K, class V, class D>
410inline AtomicHashBucketEntry* FasterKv<K, V, D>::FindTentativeEntry(KeyHash hash,
411 HashBucket* bucket,
412 uint8_t version, HashBucketEntry& expected_entry) {
413 expected_entry = HashBucketEntry::kInvalidEntry;
414 AtomicHashBucketEntry* atomic_entry = nullptr;
415 // Try to find a slot that contains the right tag or that's free.
416 while(true) {
417 // Search through the bucket looking for our key. Last entry is reserved
418 // for the overflow pointer.
419 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
420 HashBucketEntry entry = bucket->entries[entry_idx].load();
421 if(entry.unused()) {
422 if(!atomic_entry) {
423 // Found a free slot; keep track of it, and continue looking for a match.
424 atomic_entry = &bucket->entries[entry_idx];
425 }
426 continue;
427 }
428 if(hash.tag() == entry.tag() && !entry.tentative()) {
429 // Found a match. (So, the input hash matches the entry on 14 tag bits +
430 // log_2(table size) address bits.) Return it to caller.
431 expected_entry = entry;
432 return &bucket->entries[entry_idx];
433 }
434 }
435 // Go to next bucket in the chain
436 HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load();
437 if(overflow_entry.unused()) {
438 // No more buckets in the chain.
439 if(atomic_entry) {
440 // We found a free slot earlier (possibly inside an earlier bucket).
441 assert(expected_entry == HashBucketEntry::kInvalidEntry);
442 return atomic_entry;
443 }
444 // We didn't find any free slots, so allocate new bucket.
445 FixedPageAddress new_bucket_addr = overflow_buckets_allocator_[version].Allocate();
446 bool success;
447 do {
448 HashBucketOverflowEntry new_bucket_entry{ new_bucket_addr };
449 success = bucket->overflow_entry.compare_exchange_strong(overflow_entry,
450 new_bucket_entry);
451 } while(!success && overflow_entry.unused());
452 if(!success) {
453 // Install failed, undo allocation; use the winner's entry
454 overflow_buckets_allocator_[version].FreeAtEpoch(new_bucket_addr, 0);
455 } else {
456 // Install succeeded; we have a new bucket on the chain. Return its first slot.
457 bucket = &overflow_buckets_allocator_[version].Get(new_bucket_addr);
458 assert(expected_entry == HashBucketEntry::kInvalidEntry);
459 return &bucket->entries[0];
460 }
461 }
462 // Go to the next bucket.
463 bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address());
464 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
465 }
466 assert(false);
467 return nullptr; // NOT REACHED
468}
469
470template <class K, class V, class D>
471bool FasterKv<K, V, D>::HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version,
472 const AtomicHashBucketEntry* atomic_entry) const {
473 uint16_t tag = atomic_entry->load().tag();
474 while(true) {
475 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
476 HashBucketEntry entry = bucket->entries[entry_idx].load();
477 if(entry != HashBucketEntry::kInvalidEntry &&
478 entry.tag() == tag &&
479 atomic_entry != &bucket->entries[entry_idx]) {
480 // Found a conflict.
481 return true;
482 }
483 }
484 // Go to next bucket in the chain
485 HashBucketOverflowEntry entry = bucket->overflow_entry.load();
486 if(entry.unused()) {
487 // Reached the end of the bucket chain; no conflicts found.
488 return false;
489 }
490 // Go to the next bucket.
491 bucket = &overflow_buckets_allocator_[version].Get(entry.address());
492 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
493 }
494}
495
496template <class K, class V, class D>
497inline AtomicHashBucketEntry* FasterKv<K, V, D>::FindOrCreateEntry(KeyHash hash,
498 HashBucketEntry& expected_entry, HashBucket*& bucket) {
499 bucket = nullptr;
500 // Truncate the hash to get a bucket page_index < state[version].size.
501 uint32_t version = resize_info_.version;
502 assert(version <= 1);
503
504 while(true) {
505 bucket = &state_[version].bucket(hash);
506 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
507
508 AtomicHashBucketEntry* atomic_entry = FindTentativeEntry(hash, bucket, version,
509 expected_entry);
510 if(expected_entry != HashBucketEntry::kInvalidEntry) {
511 // Found an existing hash bucket entry; nothing further to check.
512 return atomic_entry;
513 }
514 // We have a free slot.
515 assert(atomic_entry);
516 assert(expected_entry == HashBucketEntry::kInvalidEntry);
517 // Try to install tentative tag in free slot.
518 HashBucketEntry entry{ Address::kInvalidAddress, hash.tag(), true };
519 if(atomic_entry->compare_exchange_strong(expected_entry, entry)) {
520 // See if some other thread is also trying to install this tag.
521 if(HasConflictingEntry(hash, bucket, version, atomic_entry)) {
522 // Back off and try again.
523 atomic_entry->store(HashBucketEntry::kInvalidEntry);
524 } else {
525 // No other thread was trying to install this tag, so we can clear our entry's "tentative"
526 // bit.
527 expected_entry = HashBucketEntry{ Address::kInvalidAddress, hash.tag(), false };
528 atomic_entry->store(expected_entry);
529 return atomic_entry;
530 }
531 }
532 }
533 assert(false);
534 return nullptr; // NOT REACHED
535}
536
537template <class K, class V, class D>
538template <class RC>
539inline Status FasterKv<K, V, D>::Read(RC& context, AsyncCallback callback,
540 uint64_t monotonic_serial_num) {
541 typedef RC read_context_t;
542 typedef PendingReadContext<RC> pending_read_context_t;
543 static_assert(std::is_base_of<value_t, typename read_context_t::value_t>::value,
544 "value_t is not a base class of read_context_t::value_t");
545 static_assert(alignof(value_t) == alignof(typename read_context_t::value_t),
546 "alignof(value_t) != alignof(typename read_context_t::value_t)");
547
548 pending_read_context_t pending_context{ context, callback };
549 OperationStatus internal_status = InternalRead(pending_context);
550 Status status;
551 if(internal_status == OperationStatus::SUCCESS) {
552 status = Status::Ok;
553 } else if(internal_status == OperationStatus::NOT_FOUND) {
554 status = Status::NotFound;
555 } else {
556 assert(internal_status == OperationStatus::RECORD_ON_DISK);
557 bool async;
558 status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async);
559 }
560 thread_ctx().serial_num = monotonic_serial_num;
561 return status;
562}
563
564template <class K, class V, class D>
565template <class UC>
566inline Status FasterKv<K, V, D>::Upsert(UC& context, AsyncCallback callback,
567 uint64_t monotonic_serial_num) {
568 typedef UC upsert_context_t;
569 typedef PendingUpsertContext<UC> pending_upsert_context_t;
570 static_assert(std::is_base_of<value_t, typename upsert_context_t::value_t>::value,
571 "value_t is not a base class of upsert_context_t::value_t");
572 static_assert(alignof(value_t) == alignof(typename upsert_context_t::value_t),
573 "alignof(value_t) != alignof(typename upsert_context_t::value_t)");
574
575 pending_upsert_context_t pending_context{ context, callback };
576 OperationStatus internal_status = InternalUpsert(pending_context);
577 Status status;
578
579 if(internal_status == OperationStatus::SUCCESS) {
580 status = Status::Ok;
581 } else {
582 bool async;
583 status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async);
584 }
585 thread_ctx().serial_num = monotonic_serial_num;
586 return status;
587}
588
589template <class K, class V, class D>
590template <class MC>
591inline Status FasterKv<K, V, D>::Rmw(MC& context, AsyncCallback callback,
592 uint64_t monotonic_serial_num) {
593 typedef MC rmw_context_t;
594 typedef PendingRmwContext<MC> pending_rmw_context_t;
595 static_assert(std::is_base_of<value_t, typename rmw_context_t::value_t>::value,
596 "value_t is not a base class of rmw_context_t::value_t");
597 static_assert(alignof(value_t) == alignof(typename rmw_context_t::value_t),
598 "alignof(value_t) != alignof(typename rmw_context_t::value_t)");
599
600 pending_rmw_context_t pending_context{ context, callback };
601 OperationStatus internal_status = InternalRmw(pending_context, false);
602 Status status;
603 if(internal_status == OperationStatus::SUCCESS) {
604 status = Status::Ok;
605 } else {
606 bool async;
607 status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async);
608 }
609 thread_ctx().serial_num = monotonic_serial_num;
610 return status;
611}
612
613template <class K, class V, class D>
614inline bool FasterKv<K, V, D>::CompletePending(bool wait) {
615 do {
616 disk.TryComplete();
617
618 bool done = true;
619 if(thread_ctx().phase != Phase::WAIT_PENDING && thread_ctx().phase != Phase::IN_PROGRESS) {
620 CompleteIoPendingRequests(thread_ctx());
621 }
622 Refresh();
623 CompleteRetryRequests(thread_ctx());
624
625 done = (thread_ctx().pending_ios.empty() && thread_ctx().retry_requests.empty());
626
627 if(thread_ctx().phase != Phase::REST) {
628 CompleteIoPendingRequests(prev_thread_ctx());
629 Refresh();
630 CompleteRetryRequests(prev_thread_ctx());
631 done = false;
632 }
633 if(done) {
634 return true;
635 }
636 } while(wait);
637 return false;
638}
639
640template <class K, class V, class D>
641inline void FasterKv<K, V, D>::CompleteIoPendingRequests(ExecutionContext& context) {
642 AsyncIOContext* ctxt;
643 // Clear this thread's I/O response queue. (Does not clear I/Os issued by this thread that have
644 // not yet completed.)
645 while(context.io_responses.try_pop(ctxt)) {
646 CallbackContext<AsyncIOContext> io_context{ ctxt };
647 CallbackContext<pending_context_t> pending_context{ io_context->caller_context };
648 // This I/O is no longer pending, since we popped its response off the queue.
649 auto pending_io = context.pending_ios.find(io_context->io_id);
650 assert(pending_io != context.pending_ios.end());
651 context.pending_ios.erase(pending_io);
652
653 // Issue the continue command
654 OperationStatus internal_status;
655 if(pending_context->type == OperationType::Read) {
656 internal_status = InternalContinuePendingRead(context, *io_context.get());
657 } else {
658 assert(pending_context->type == OperationType::RMW);
659 internal_status = InternalContinuePendingRmw(context, *io_context.get());
660 }
661 Status result;
662 if(internal_status == OperationStatus::SUCCESS) {
663 result = Status::Ok;
664 } else if(internal_status == OperationStatus::NOT_FOUND) {
665 result = Status::NotFound;
666 } else {
667 result = HandleOperationStatus(context, *pending_context.get(), internal_status,
668 pending_context.async);
669 }
670 if(!pending_context.async) {
671 pending_context->caller_callback(pending_context->caller_context, result);
672 }
673 }
674}
675
676template <class K, class V, class D>
677inline void FasterKv<K, V, D>::CompleteRetryRequests(ExecutionContext& context) {
678 // If we can't complete a request, it will be pushed back onto the deque. Retry each request
679 // only once.
680 size_t size = context.retry_requests.size();
681 for(size_t idx = 0; idx < size; ++idx) {
682 CallbackContext<pending_context_t> pending_context{ context.retry_requests.front() };
683 context.retry_requests.pop_front();
684 // Issue retry command
685 OperationStatus internal_status;
686 switch(pending_context->type) {
687 case OperationType::RMW:
688 internal_status = InternalRetryPendingRmw(
689 *static_cast<async_pending_rmw_context_t*>(pending_context.get()));
690 break;
691 case OperationType::Upsert:
692 internal_status = InternalUpsert(
693 *static_cast<async_pending_upsert_context_t*>(pending_context.get()));
694 break;
695 default:
696 assert(false);
697 throw std::runtime_error{ "Cannot happen!" };
698 }
699 // Handle operation status
700 Status result;
701 if(internal_status == OperationStatus::SUCCESS) {
702 result = Status::Ok;
703 } else {
704 result = HandleOperationStatus(context, *pending_context.get(), internal_status,
705 pending_context.async);
706 }
707
708 // If done, callback user code.
709 if(!pending_context.async) {
710 pending_context->caller_callback(pending_context->caller_context, result);
711 }
712 }
713}
714
715template <class K, class V, class D>
716template <class C>
717inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const {
718 typedef C pending_read_context_t;
719
720 if(thread_ctx().phase != Phase::REST) {
721 const_cast<faster_t*>(this)->HeavyEnter();
722 }
723
724 const key_t& key = pending_context.key();
725 KeyHash hash = key.GetHash();
726 const AtomicHashBucketEntry* atomic_entry = FindEntry(hash);
727 if(!atomic_entry) {
728 // no record found
729 return OperationStatus::NOT_FOUND;
730 }
731
732 HashBucketEntry entry = atomic_entry->load();
733 Address address = entry.address();
734 Address begin_address = hlog.begin_address.load();
735 Address head_address = hlog.head_address.load();
736 Address safe_read_only_address = hlog.safe_read_only_address.load();
737 Address read_only_address = hlog.read_only_address.load();
738 uint64_t latest_record_version = 0;
739
740 if(address >= head_address) {
741 // Look through the in-memory portion of the log, to find the first record (if any) whose key
742 // matches.
743 const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
744 latest_record_version = record->header.checkpoint_version;
745 if(key != record->key()) {
746 address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
747 }
748 }
749
750 switch(thread_ctx().phase) {
751 case Phase::PREPARE:
752 // Reading old version (v).
753 if(latest_record_version > thread_ctx().version) {
754 // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
755 // what we've seen.
756 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, entry);
757 return OperationStatus::CPR_SHIFT_DETECTED;
758 }
759 break;
760 default:
761 break;
762 }
763
764 if(address >= safe_read_only_address) {
765 // Mutable or fuzzy region
766 // concurrent read
767 pending_context.GetAtomic(hlog.Get(address));
768 return OperationStatus::SUCCESS;
769 } else if(address >= head_address) {
770 // Immutable region
771 // single-thread read
772 pending_context.Get(hlog.Get(address));
773 return OperationStatus::SUCCESS;
774 } else if(address >= begin_address) {
775 // Record not available in-memory
776 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, entry);
777 return OperationStatus::RECORD_ON_DISK;
778 } else {
779 // No record found
780 return OperationStatus::NOT_FOUND;
781 }
782}
783
784template <class K, class V, class D>
785template <class C>
786inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
787 typedef C pending_upsert_context_t;
788
789 if(thread_ctx().phase != Phase::REST) {
790 HeavyEnter();
791 }
792
793 const key_t& key = pending_context.key();
794 KeyHash hash = key.GetHash();
795 HashBucketEntry expected_entry;
796 HashBucket* bucket;
797 AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket);
798
799 // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
800 Address address = expected_entry.address();
801 Address head_address = hlog.head_address.load();
802 Address read_only_address = hlog.read_only_address.load();
803 uint64_t latest_record_version = 0;
804
805 if(address >= head_address) {
806 // Multiple keys may share the same hash. Try to find the most recent record with a matching
807 // key that we might be able to update in place.
808 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
809 latest_record_version = record->header.checkpoint_version;
810 if(key != record->key()) {
811 address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
812 }
813 }
814
815 CheckpointLockGuard lock_guard{ checkpoint_locks_, hash };
816
817 // The common case
818 if(thread_ctx().phase == Phase::REST && address >= read_only_address) {
819 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
820 if(pending_context.PutAtomic(record)) {
821 return OperationStatus::SUCCESS;
822 } else {
823 // Must retry as RCU.
824 goto create_record;
825 }
826 }
827
828 // Acquire necessary locks.
829 switch(thread_ctx().phase) {
830 case Phase::PREPARE:
831 // Working on old version (v).
832 if(!lock_guard.try_lock_old()) {
833 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
834 return OperationStatus::CPR_SHIFT_DETECTED;
835 } else {
836 if(latest_record_version > thread_ctx().version) {
837 // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
838 // what we've seen.
839 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
840 expected_entry);
841 return OperationStatus::CPR_SHIFT_DETECTED;
842 }
843 }
844 break;
845 case Phase::IN_PROGRESS:
846 // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}.
847 if(latest_record_version < thread_ctx().version) {
848 // Will create new record or update existing record to new version (v+1).
849 if(!lock_guard.try_lock_new()) {
850 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
851 expected_entry);
852 return OperationStatus::RETRY_LATER;
853 } else {
854 // Update to new version (v+1) requires RCU.
855 goto create_record;
856 }
857 }
858 break;
859 case Phase::WAIT_PENDING:
860 // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}.
861 if(latest_record_version < thread_ctx().version) {
862 if(lock_guard.old_locked()) {
863 pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
864 expected_entry);
865 return OperationStatus::RETRY_LATER;
866 } else {
867 // Update to new version (v+1) requires RCU.
868 goto create_record;
869 }
870 }
871 break;
872 case Phase::WAIT_FLUSH:
873 // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}.
874 if(latest_record_version < thread_ctx().version) {
875 goto create_record;
876 }
877 break;
878 default:
879 break;
880 }
881
882 if(address >= read_only_address) {
883 // Mutable region; try to update in place.
884 if(atomic_entry->load() != expected_entry) {
885 // Some other thread may have RCUed the record before we locked it; try again.
886 return OperationStatus::RETRY_NOW;
887 }
888 // We acquired the necessary locks, so so we can update the record's bucket atomically.
889 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
890 if(pending_context.PutAtomic(record)) {
891 // Host successfully replaced record, atomically.
892 return OperationStatus::SUCCESS;
893 } else {
894 // Must retry as RCU.
895 goto create_record;
896 }
897 }
898
899 // Create a record and attempt RCU.
900create_record:
901 uint32_t record_size = record_t::size(key, pending_context.value_size());
902 Address new_address = BlockAllocate(record_size);
903 record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
904 new(record) record_t{
905 RecordInfo{
906 static_cast<uint16_t>(thread_ctx().version), true, false, false,
907 expected_entry.address() },
908 key };
909 pending_context.Put(record);
910
911 HashBucketEntry updated_entry{ new_address, hash.tag(), false };
912
913 if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
914 // Installed the new record in the hash table.
915 return OperationStatus::SUCCESS;
916 } else {
917 // Try again.
918 record->header.invalid = true;
919 return InternalUpsert(pending_context);
920 }
921}
922
923template <class K, class V, class D>
924template <class C>
925inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool retrying) {
926 typedef C pending_rmw_context_t;
927
928 Phase phase = retrying ? pending_context.phase : thread_ctx().phase;
929 uint32_t version = retrying ? pending_context.version : thread_ctx().version;
930
931 if(phase != Phase::REST) {
932 HeavyEnter();
933 }
934
935 const key_t& key = pending_context.key();
936 KeyHash hash = key.GetHash();
937 HashBucketEntry expected_entry;
938 HashBucket* bucket;
939 AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket);
940
941 // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
942 Address address = expected_entry.address();
943 Address begin_address = hlog.begin_address.load();
944 Address head_address = hlog.head_address.load();
945 Address read_only_address = hlog.read_only_address.load();
946 Address safe_read_only_address = hlog.safe_read_only_address.load();
947 uint64_t latest_record_version = 0;
948
949 if(address >= head_address) {
950 // Multiple keys may share the same hash. Try to find the most recent record with a matching
951 // key that we might be able to update in place.
952 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
953 latest_record_version = record->header.checkpoint_version;
954 if(key != record->key()) {
955 address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
956 }
957 }
958
959 CheckpointLockGuard lock_guard{ checkpoint_locks_, hash };
960
961 // The common case.
962 if(phase == Phase::REST && address >= read_only_address) {
963 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
964 if(pending_context.RmwAtomic(record)) {
965 // In-place RMW succeeded.
966 return OperationStatus::SUCCESS;
967 } else {
968 // Must retry as RCU.
969 goto create_record;
970 }
971 }
972
973 // Acquire necessary locks.
974 switch(phase) {
975 case Phase::PREPARE:
976 // Working on old version (v).
977 if(!lock_guard.try_lock_old()) {
978 // If we're retrying the operation, then we already have an old lock, so we'll always
979 // succeed in obtaining a second. Otherwise, another thread has acquired the new lock, so
980 // a CPR shift has occurred.
981 assert(!retrying);
982 pending_context.go_async(phase, version, address, expected_entry);
983 return OperationStatus::CPR_SHIFT_DETECTED;
984 } else {
985 if(latest_record_version > version) {
986 // CPR shift detected: we are in the "PREPARE" phase, and a mutable record has a version
987 // later than what we've seen.
988 assert(!retrying);
989 pending_context.go_async(phase, version, address, expected_entry);
990 return OperationStatus::CPR_SHIFT_DETECTED;
991 }
992 }
993 break;
994 case Phase::IN_PROGRESS:
995 // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}.
996 if(latest_record_version < version) {
997 // Will create new record or update existing record to new version (v+1).
998 if(!lock_guard.try_lock_new()) {
999 if(!retrying) {
1000 pending_context.go_async(phase, version, address, expected_entry);
1001 } else {
1002 pending_context.continue_async(address, expected_entry);
1003 }
1004 return OperationStatus::RETRY_LATER;
1005 } else {
1006 // Update to new version (v+1) requires RCU.
1007 goto create_record;
1008 }
1009 }
1010 break;
1011 case Phase::WAIT_PENDING:
1012 // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}.
1013 if(latest_record_version < version) {
1014 if(lock_guard.old_locked()) {
1015 if(!retrying) {
1016 pending_context.go_async(phase, version, address, expected_entry);
1017 } else {
1018 pending_context.continue_async(address, expected_entry);
1019 }
1020 return OperationStatus::RETRY_LATER;
1021 } else {
1022 // Update to new version (v+1) requires RCU.
1023 goto create_record;
1024 }
1025 }
1026 break;
1027 case Phase::WAIT_FLUSH:
1028 // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}.
1029 if(latest_record_version < version) {
1030 goto create_record;
1031 }
1032 break;
1033 default:
1034 break;
1035 }
1036
1037 if(address >= read_only_address) {
1038 // Mutable region. Try to update in place.
1039 if(atomic_entry->load() != expected_entry) {
1040 // Some other thread may have RCUed the record before we locked it; try again.
1041 return OperationStatus::RETRY_NOW;
1042 }
1043 // We acquired the necessary locks, so so we can update the record's bucket atomically.
1044 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
1045 if(pending_context.RmwAtomic(record)) {
1046 // In-place RMW succeeded.
1047 return OperationStatus::SUCCESS;
1048 } else {
1049 // Must retry as RCU.
1050 goto create_record;
1051 }
1052 } else if(address >= safe_read_only_address) {
1053 // Fuzzy Region: Must go pending due to lost-update anomaly
1054 if(!retrying) {
1055 pending_context.go_async(phase, version, address, expected_entry);
1056 } else {
1057 pending_context.continue_async(address, expected_entry);
1058 }
1059 return OperationStatus::RETRY_LATER;
1060 } else if(address >= head_address) {
1061 goto create_record;
1062 } else if(address >= begin_address) {
1063 // Need to obtain old record from disk.
1064 if(!retrying) {
1065 pending_context.go_async(phase, version, address, expected_entry);
1066 } else {
1067 pending_context.continue_async(address, expected_entry);
1068 }
1069 return OperationStatus::RECORD_ON_DISK;
1070 } else {
1071 // Create a new record.
1072 goto create_record;
1073 }
1074
1075 // Create a record and attempt RCU.
1076create_record:
1077 uint32_t record_size = record_t::size(key, pending_context.value_size());
1078 Address new_address = BlockAllocate(record_size);
1079 record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));
1080
1081 // Allocating a block may have the side effect of advancing the head address.
1082 head_address = hlog.head_address.load();
1083 // Allocating a block may have the side effect of advancing the thread context's version and
1084 // phase.
1085 if(!retrying) {
1086 phase = thread_ctx().phase;
1087 version = thread_ctx().version;
1088 }
1089
1090 new(new_record) record_t{
1091 RecordInfo{
1092 static_cast<uint16_t>(version), true, false, false,
1093 expected_entry.address() },
1094 key };
1095 if(address < hlog.begin_address.load()) {
1096 pending_context.RmwInitial(new_record);
1097 } else if(address >= head_address) {
1098 const record_t* old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
1099 pending_context.RmwCopy(old_record, new_record);
1100 } else {
1101 // The block we allocated for the new record caused the head address to advance beyond
1102 // the old record. Need to obtain the old record from disk.
1103 new_record->header.invalid = true;
1104 if(!retrying) {
1105 pending_context.go_async(phase, version, address, expected_entry);
1106 } else {
1107 pending_context.continue_async(address, expected_entry);
1108 }
1109 return OperationStatus::RECORD_ON_DISK;
1110 }
1111
1112 HashBucketEntry updated_entry{ new_address, hash.tag(), false };
1113 if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
1114 return OperationStatus::SUCCESS;
1115 } else {
1116 // CAS failed; try again.
1117 new_record->header.invalid = true;
1118 if(!retrying) {
1119 pending_context.go_async(phase, version, address, expected_entry);
1120 } else {
1121 pending_context.continue_async(address, expected_entry);
1122 }
1123 return OperationStatus::RETRY_NOW;
1124 }
1125}
1126
1127template <class K, class V, class D>
1128inline OperationStatus FasterKv<K, V, D>::InternalRetryPendingRmw(
1129 async_pending_rmw_context_t& pending_context) {
1130 OperationStatus status = InternalRmw(pending_context, true);
1131 if(status == OperationStatus::SUCCESS && pending_context.version != thread_ctx().version) {
1132 status = OperationStatus::SUCCESS_UNMARK;
1133 }
1134 return status;
1135}
1136
1137template <class K, class V, class D>
1138inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address,
1139 Address min_offset) const {
1140 while(from_address >= min_offset) {
1141 const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
1142 if(key == record->key()) {
1143 return from_address;
1144 } else {
1145 from_address = record->header.previous_address();
1146 continue;
1147 }
1148 }
1149 return from_address;
1150}
1151
1152template <class K, class V, class D>
1153inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx,
1154 pending_context_t& pending_context, OperationStatus internal_status, bool& async) {
1155 async = false;
1156 switch(internal_status) {
1157 case OperationStatus::RETRY_NOW:
1158 switch(pending_context.type) {
1159 case OperationType::Read: {
1160 async_pending_read_context_t& read_context =
1161 *static_cast<async_pending_read_context_t*>(&pending_context);
1162 internal_status = InternalRead(read_context);
1163 break;
1164 }
1165 case OperationType::Upsert: {
1166 async_pending_upsert_context_t& upsert_context =
1167 *static_cast<async_pending_upsert_context_t*>(&pending_context);
1168 internal_status = InternalUpsert(upsert_context);
1169 break;
1170 }
1171 case OperationType::RMW: {
1172 async_pending_rmw_context_t& rmw_context =
1173 *static_cast<async_pending_rmw_context_t*>(&pending_context);
1174 internal_status = InternalRmw(rmw_context, false);
1175 break;
1176 }
1177 }
1178
1179 if(internal_status == OperationStatus::SUCCESS) {
1180 return Status::Ok;
1181 } else {
1182 return HandleOperationStatus(ctx, pending_context, internal_status, async);
1183 }
1184 case OperationStatus::RETRY_LATER:
1185 if(thread_ctx().phase == Phase::PREPARE) {
1186 assert(pending_context.type == OperationType::RMW);
1187 // Can I be marking an operation again and again?
1188 if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
1189 return PivotAndRetry(ctx, pending_context, async);
1190 }
1191 }
1192 return RetryLater(ctx, pending_context, async);
1193 case OperationStatus::RECORD_ON_DISK:
1194 if(thread_ctx().phase == Phase::PREPARE) {
1195 assert(pending_context.type == OperationType::Read ||
1196 pending_context.type == OperationType::RMW);
1197 // Can I be marking an operation again and again?
1198 if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) {
1199 return PivotAndRetry(ctx, pending_context, async);
1200 }
1201 }
1202 return IssueAsyncIoRequest(ctx, pending_context, async);
1203 case OperationStatus::SUCCESS_UNMARK:
1204 checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
1205 return Status::Ok;
1206 case OperationStatus::NOT_FOUND_UNMARK:
1207 checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old();
1208 return Status::NotFound;
1209 case OperationStatus::CPR_SHIFT_DETECTED:
1210 return PivotAndRetry(ctx, pending_context, async);
1211 }
1212 // not reached
1213 assert(false);
1214 return Status::Corruption;
1215}
1216
1217template <class K, class V, class D>
1218inline Status FasterKv<K, V, D>::PivotAndRetry(ExecutionContext& ctx,
1219 pending_context_t& pending_context, bool& async) {
1220 // Some invariants
1221 assert(ctx.version == thread_ctx().version);
1222 assert(thread_ctx().phase == Phase::PREPARE);
1223 Refresh();
1224 // thread must have moved to IN_PROGRESS phase
1225 assert(thread_ctx().version == ctx.version + 1);
1226 // retry with new contexts
1227 pending_context.phase = thread_ctx().phase;
1228 pending_context.version = thread_ctx().version;
1229 return HandleOperationStatus(thread_ctx(), pending_context, OperationStatus::RETRY_NOW, async);
1230}
1231
1232template <class K, class V, class D>
1233inline Status FasterKv<K, V, D>::RetryLater(ExecutionContext& ctx,
1234 pending_context_t& pending_context, bool& async) {
1235 IAsyncContext* context_copy;
1236 Status result = pending_context.DeepCopy(context_copy);
1237 if(result == Status::Ok) {
1238 async = true;
1239 ctx.retry_requests.push_back(context_copy);
1240 return Status::Pending;
1241 } else {
1242 async = false;
1243 return result;
1244 }
1245}
1246
1247template <class K, class V, class D>
1248inline constexpr uint32_t FasterKv<K, V, D>::MinIoRequestSize() const {
1249 return static_cast<uint32_t>(
1250 sizeof(value_t) + pad_alignment(record_t::min_disk_key_size(),
1251 alignof(value_t)));
1252}
1253
1254template <class K, class V, class D>
1255inline Status FasterKv<K, V, D>::IssueAsyncIoRequest(ExecutionContext& ctx,
1256 pending_context_t& pending_context, bool& async) {
1257 // Issue asynchronous I/O request
1258 uint64_t io_id = thread_ctx().io_id++;
1259 thread_ctx().pending_ios.insert({ io_id, pending_context.key().GetHash() });
1260 async = true;
1261 AsyncIOContext io_request{ this, pending_context.address, &pending_context,
1262 &thread_ctx().io_responses, io_id };
1263 AsyncGetFromDisk(pending_context.address, MinIoRequestSize(), AsyncGetFromDiskCallback,
1264 io_request);
1265 return Status::Pending;
1266}
1267
1268template <class K, class V, class D>
1269inline Address FasterKv<K, V, D>::BlockAllocate(uint32_t record_size) {
1270 uint32_t page;
1271 Address retval = hlog.Allocate(record_size, page);
1272 while(retval < hlog.read_only_address.load()) {
1273 Refresh();
1274 // Don't overrun the hlog's tail offset.
1275 bool page_closed = (retval == Address::kInvalidAddress);
1276 while(page_closed) {
1277 page_closed = !hlog.NewPage(page);
1278 Refresh();
1279 }
1280 retval = hlog.Allocate(record_size, page);
1281 }
1282 return retval;
1283}
1284
1285template <class K, class V, class D>
1286void FasterKv<K, V, D>::AsyncGetFromDisk(Address address, uint32_t num_records,
1287 AsyncIOCallback callback, AsyncIOContext& context) {
1288 if(epoch_.IsProtected()) {
1289 /// Throttling. (Thread pool, unprotected threads are not throttled.)
1290 while(num_pending_ios.load() > 120) {
1291 disk.TryComplete();
1292 std::this_thread::yield();
1293 epoch_.ProtectAndDrain();
1294 }
1295 }
1296 ++num_pending_ios;
1297 hlog.AsyncGetFromDisk(address, num_records, callback, context);
1298}
1299
1300template <class K, class V, class D>
1301void FasterKv<K, V, D>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
1302 size_t bytes_transferred) {
1303 CallbackContext<AsyncIOContext> context{ ctxt };
1304 faster_t* faster = reinterpret_cast<faster_t*>(context->faster);
1305 /// Context stack is: AsyncIOContext, PendingContext.
1306 pending_context_t* pending_context = static_cast<pending_context_t*>(context->caller_context);
1307
1308 /// This I/O is finished.
1309 --faster->num_pending_ios;
1310 /// Always "goes async": context is freed by the issuing thread, when processing thread I/O
1311 /// responses.
1312 context.async = true;
1313
1314 pending_context->result = result;
1315 if(result == Status::Ok) {
1316 record_t* record = reinterpret_cast<record_t*>(context->record.GetValidPointer());
1317 // Size of the record we read from disk (might not have read the entire record, yet).
1318 size_t record_size = context->record.available_bytes;
1319 if(record->min_disk_key_size() > record_size) {
1320 // Haven't read the full record in yet; I/O is not complete!
1321 faster->AsyncGetFromDisk(context->address, record->min_disk_key_size(),
1322 AsyncGetFromDiskCallback, *context.get());
1323 context.async = true;
1324 } else if(record->min_disk_value_size() > record_size) {
1325 // Haven't read the full record in yet; I/O is not complete!
1326 faster->AsyncGetFromDisk(context->address, record->min_disk_value_size(),
1327 AsyncGetFromDiskCallback, *context.get());
1328 context.async = true;
1329 } else if(record->disk_size() > record_size) {
1330 // Haven't read the full record in yet; I/O is not complete!
1331 faster->AsyncGetFromDisk(context->address, record->disk_size(),
1332 AsyncGetFromDiskCallback, *context.get());
1333 context.async = true;
1334 } else if(pending_context->key() == record->key()) {
1335 //The keys are same, so I/O is complete
1336 context->thread_io_responses->push(context.get());
1337 } else {
1338 //keys are not same. I/O is not complete
1339 context->address = record->header.previous_address();
1340 if(context->address >= faster->hlog.begin_address.load()) {
1341 faster->AsyncGetFromDisk(context->address, faster->MinIoRequestSize(),
1342 AsyncGetFromDiskCallback, *context.get());
1343 context.async = true;
1344 } else {
1345 // Record not found, so I/O is complete.
1346 context->thread_io_responses->push(context.get());
1347 }
1348 }
1349 }
1350}
1351
1352template <class K, class V, class D>
1353OperationStatus FasterKv<K, V, D>::InternalContinuePendingRead(ExecutionContext& context,
1354 AsyncIOContext& io_context) {
1355 if(io_context.address >= hlog.begin_address.load()) {
1356 async_pending_read_context_t* pending_context = static_cast<async_pending_read_context_t*>(
1357 io_context.caller_context);
1358 record_t* record = reinterpret_cast<record_t*>(io_context.record.GetValidPointer());
1359 pending_context->Get(record);
1360 assert(!kCopyReadsToTail);
1361 return (thread_ctx().version > context.version) ? OperationStatus::SUCCESS_UNMARK :
1362 OperationStatus::SUCCESS;
1363 } else {
1364 return (thread_ctx().version > context.version) ? OperationStatus::NOT_FOUND_UNMARK :
1365 OperationStatus::NOT_FOUND;
1366 }
1367}
1368
1369template <class K, class V, class D>
1370OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& context,
1371 AsyncIOContext& io_context) {
1372 async_pending_rmw_context_t* pending_context = static_cast<async_pending_rmw_context_t*>(
1373 io_context.caller_context);
1374
1375 // Find a hash bucket entry to store the updated value in.
1376 const key_t& key = pending_context->key();
1377 KeyHash hash = key.GetHash();
1378 HashBucketEntry expected_entry;
1379 HashBucket* bucket;
1380 AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket);
1381
1382 // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
1383 Address address = expected_entry.address();
1384 Address head_address = hlog.head_address.load();
1385
1386 // Make sure that atomic_entry is OK to update.
1387 if(address >= head_address) {
1388 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
1389 if(key != record->key()) {
1390 address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address);
1391 }
1392 }
1393
1394 if(address > pending_context->entry.address()) {
1395 // We can't trace the current hash bucket entry back to the record we read.
1396 pending_context->continue_async(address, expected_entry);
1397 return OperationStatus::RETRY_NOW;
1398 }
1399 assert(address < hlog.begin_address.load() || address == pending_context->entry.address());
1400
1401 // We have to do copy-on-write/RCU and write the updated value to the tail of the log.
1402 uint32_t record_size = record_t::size(key, pending_context->value_size());
1403 Address new_address = BlockAllocate(record_size);
1404 record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));
1405
1406 new(new_record) record_t{
1407 RecordInfo{
1408 static_cast<uint16_t>(context.version), true, false, false,
1409 expected_entry.address() },
1410 key };
1411 if(io_context.address < hlog.begin_address.load()) {
1412 // The on-disk trace back failed to find a key match.
1413 pending_context->RmwInitial(new_record);
1414 } else {
1415 // The record we read from disk.
1416 const record_t* disk_record = reinterpret_cast<const record_t*>(
1417 io_context.record.GetValidPointer());
1418 pending_context->RmwCopy(disk_record, new_record);
1419 }
1420
1421 HashBucketEntry updated_entry{ new_address, hash.tag(), false };
1422 if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
1423 assert(thread_ctx().version >= context.version);
1424 return (thread_ctx().version == context.version) ? OperationStatus::SUCCESS :
1425 OperationStatus::SUCCESS_UNMARK;
1426 } else {
1427 // CAS failed; try again.
1428 new_record->header.invalid = true;
1429 pending_context->continue_async(address, expected_entry);
1430 return OperationStatus::RETRY_NOW;
1431 }
1432}
1433
1434template <class K, class V, class D>
1435void FasterKv<K, V, D>::InitializeCheckpointLocks() {
1436 uint32_t table_version = resize_info_.version;
1437 uint64_t size = state_[table_version].size();
1438 checkpoint_locks_.Initialize(size);
1439}
1440
1441template <class K, class V, class D>
1442Status FasterKv<K, V, D>::WriteIndexMetadata() {
1443 std::string filename = disk.index_checkpoint_path(checkpoint_.index_token) + "info.dat";
1444 // (This code will need to be refactored into the disk_t interface, if we want to support
1445 // unformatted disks.)
1446 std::FILE* file = std::fopen(filename.c_str(), "wb");
1447 if(!file) {
1448 return Status::IOError;
1449 }
1450 if(std::fwrite(&checkpoint_.index_metadata, sizeof(checkpoint_.index_metadata), 1, file) != 1) {
1451 std::fclose(file);
1452 return Status::IOError;
1453 }
1454 if(std::fclose(file) != 0) {
1455 return Status::IOError;
1456 }
1457 return Status::Ok;
1458}
1459
1460template <class K, class V, class D>
1461Status FasterKv<K, V, D>::ReadIndexMetadata(const Guid& token) {
1462 std::string filename = disk.index_checkpoint_path(token) + "info.dat";
1463 // (This code will need to be refactored into the disk_t interface, if we want to support
1464 // unformatted disks.)
1465 std::FILE* file = std::fopen(filename.c_str(), "rb");
1466 if(!file) {
1467 return Status::IOError;
1468 }
1469 if(std::fread(&checkpoint_.index_metadata, sizeof(checkpoint_.index_metadata), 1, file) != 1) {
1470 std::fclose(file);
1471 return Status::IOError;
1472 }
1473 if(std::fclose(file) != 0) {
1474 return Status::IOError;
1475 }
1476 return Status::Ok;
1477}
1478
1479template <class K, class V, class D>
1480Status FasterKv<K, V, D>::WriteCprMetadata() {
1481 std::string filename = disk.cpr_checkpoint_path(checkpoint_.hybrid_log_token) + "info.dat";
1482 // (This code will need to be refactored into the disk_t interface, if we want to support
1483 // unformatted disks.)
1484 std::FILE* file = std::fopen(filename.c_str(), "wb");
1485 if(!file) {
1486 return Status::IOError;
1487 }
1488 if(std::fwrite(&checkpoint_.log_metadata, sizeof(checkpoint_.log_metadata), 1, file) != 1) {
1489 std::fclose(file);
1490 return Status::IOError;
1491 }
1492 if(std::fclose(file) != 0) {
1493 return Status::IOError;
1494 }
1495 return Status::Ok;
1496}
1497
1498template <class K, class V, class D>
1499Status FasterKv<K, V, D>::ReadCprMetadata(const Guid& token) {
1500 std::string filename = disk.cpr_checkpoint_path(token) + "info.dat";
1501 // (This code will need to be refactored into the disk_t interface, if we want to support
1502 // unformatted disks.)
1503 std::FILE* file = std::fopen(filename.c_str(), "rb");
1504 if(!file) {
1505 return Status::IOError;
1506 }
1507 if(std::fread(&checkpoint_.log_metadata, sizeof(checkpoint_.log_metadata), 1, file) != 1) {
1508 std::fclose(file);
1509 return Status::IOError;
1510 }
1511 if(std::fclose(file) != 0) {
1512 return Status::IOError;
1513 }
1514 return Status::Ok;
1515}
1516
1517template <class K, class V, class D>
1518Status FasterKv<K, V, D>::WriteCprContext() {
1519 std::string filename = disk.cpr_checkpoint_path(checkpoint_.hybrid_log_token);
1520 const Guid& guid = prev_thread_ctx().guid;
1521 filename += guid.ToString();
1522 filename += ".dat";
1523 // (This code will need to be refactored into the disk_t interface, if we want to support
1524 // unformatted disks.)
1525 std::FILE* file = std::fopen(filename.c_str(), "wb");
1526 if(!file) {
1527 return Status::IOError;
1528 }
1529 if(std::fwrite(static_cast<PersistentExecContext*>(&prev_thread_ctx()),
1530 sizeof(PersistentExecContext), 1, file) != 1) {
1531 std::fclose(file);
1532 return Status::IOError;
1533 }
1534 if(std::fclose(file) != 0) {
1535 return Status::IOError;
1536 }
1537 return Status::Ok;
1538}
1539
1540template <class K, class V, class D>
1541Status FasterKv<K, V, D>::ReadCprContexts(const Guid& token, const Guid* guids) {
1542 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
1543 const Guid& guid = guids[idx];
1544 if(guid == Guid{}) {
1545 continue;
1546 }
1547 std::string filename = disk.cpr_checkpoint_path(token);
1548 filename += guid.ToString();
1549 filename += ".dat";
1550 // (This code will need to be refactored into the disk_t interface, if we want to support
1551 // unformatted disks.)
1552 std::FILE* file = std::fopen(filename.c_str(), "rb");
1553 if(!file) {
1554 return Status::IOError;
1555 }
1556 PersistentExecContext context{};
1557 if(std::fread(&context, sizeof(PersistentExecContext), 1, file) != 1) {
1558 std::fclose(file);
1559 return Status::IOError;
1560 }
1561 if(std::fclose(file) != 0) {
1562 return Status::IOError;
1563 }
1564 auto result = checkpoint_.continue_tokens.insert({ context.guid, context.serial_num });
1565 assert(result.second);
1566 }
1567 if(checkpoint_.continue_tokens.size() != checkpoint_.log_metadata.num_threads) {
1568 return Status::Corruption;
1569 } else {
1570 return Status::Ok;
1571 }
1572}
1573
1574template <class K, class V, class D>
1575Status FasterKv<K, V, D>::CheckpointFuzzyIndex() {
1576 uint32_t hash_table_version = resize_info_.version;
1577 // Checkpoint the main hash table.
1578 file_t ht_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) +
1579 "ht.dat");
1580 RETURN_NOT_OK(ht_file.Open(&disk.handler()));
1581 RETURN_NOT_OK(state_[hash_table_version].Checkpoint(disk, std::move(ht_file),
1582 checkpoint_.index_metadata.num_ht_bytes));
1583 // Checkpoint the hash table's overflow buckets.
1584 file_t ofb_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) +
1585 "ofb.dat");
1586 RETURN_NOT_OK(ofb_file.Open(&disk.handler()));
1587 RETURN_NOT_OK(overflow_buckets_allocator_[hash_table_version].Checkpoint(disk,
1588 std::move(ofb_file), checkpoint_.index_metadata.num_ofb_bytes));
1589 checkpoint_.index_checkpoint_started = true;
1590 return Status::Ok;
1591}
1592
1593template <class K, class V, class D>
1594Status FasterKv<K, V, D>::CheckpointFuzzyIndexComplete() {
1595 if(!checkpoint_.index_checkpoint_started) {
1596 return Status::Pending;
1597 }
1598 uint32_t hash_table_version = resize_info_.version;
1599 Status result = state_[hash_table_version].CheckpointComplete(false);
1600 if(result == Status::Pending) {
1601 return Status::Pending;
1602 } else if(result != Status::Ok) {
1603 return result;
1604 } else {
1605 return overflow_buckets_allocator_[hash_table_version].CheckpointComplete(false);
1606 }
1607}
1608
1609template <class K, class V, class D>
1610Status FasterKv<K, V, D>::RecoverFuzzyIndex() {
1611 uint8_t hash_table_version = resize_info_.version;
1612 assert(state_[hash_table_version].size() == checkpoint_.index_metadata.table_size);
1613
1614 // Recover the main hash table.
1615 file_t ht_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) +
1616 "ht.dat");
1617 RETURN_NOT_OK(ht_file.Open(&disk.handler()));
1618 RETURN_NOT_OK(state_[hash_table_version].Recover(disk, std::move(ht_file),
1619 checkpoint_.index_metadata.num_ht_bytes));
1620 // Recover the hash table's overflow buckets.
1621 file_t ofb_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) +
1622 "ofb.dat");
1623 RETURN_NOT_OK(ofb_file.Open(&disk.handler()));
1624 return overflow_buckets_allocator_[hash_table_version].Recover(disk, std::move(ofb_file),
1625 checkpoint_.index_metadata.num_ofb_bytes, checkpoint_.index_metadata.ofb_count);
1626}
1627
1628template <class K, class V, class D>
1629Status FasterKv<K, V, D>::RecoverFuzzyIndexComplete(bool wait) {
1630 uint8_t hash_table_version = resize_info_.version;
1631 Status result = state_[hash_table_version].RecoverComplete(true);
1632 if(result != Status::Ok) {
1633 return result;
1634 }
1635 result = overflow_buckets_allocator_[hash_table_version].RecoverComplete(true);
1636 if(result != Status::Ok) {
1637 return result;
1638 }
1639
1640 // Clear all tentative entries.
1641 for(uint64_t bucket_idx = 0; bucket_idx < state_[hash_table_version].size(); ++bucket_idx) {
1642 HashBucket* bucket = &state_[hash_table_version].bucket(bucket_idx);
1643 while(true) {
1644 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
1645 if(bucket->entries[entry_idx].load().tentative()) {
1646 bucket->entries[entry_idx].store(HashBucketEntry::kInvalidEntry);
1647 }
1648 }
1649 // Go to next bucket in the chain
1650 HashBucketOverflowEntry entry = bucket->overflow_entry.load();
1651 if(entry.unused()) {
1652 // No more buckets in the chain.
1653 break;
1654 }
1655 bucket = &overflow_buckets_allocator_[hash_table_version].Get(entry.address());
1656 assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0);
1657 }
1658 }
1659 return Status::Ok;
1660}
1661
1662template <class K, class V, class D>
1663Status FasterKv<K, V, D>::RecoverHybridLog() {
1664 class Context : public IAsyncContext {
1665 public:
1666 Context(hlog_t& hlog_, uint32_t page_, RecoveryStatus& recovery_status_)
1667 : hlog{ &hlog_}
1668 , page{ page_ }
1669 , recovery_status{ &recovery_status_ } {
1670 }
1671 /// The deep-copy constructor
1672 Context(const Context& other)
1673 : hlog{ other.hlog }
1674 , page{ other.page }
1675 , recovery_status{ other.recovery_status } {
1676 }
1677 protected:
1678 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
1679 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1680 }
1681 public:
1682 hlog_t* hlog;
1683 uint32_t page;
1684 RecoveryStatus* recovery_status;
1685 };
1686
1687 auto callback = [](IAsyncContext* ctxt, Status result) {
1688 CallbackContext<Context> context{ ctxt };
1689 result = context->hlog->AsyncReadPagesFromLog(context->page, 1, *context->recovery_status);
1690 };
1691
1692 Address from_address = checkpoint_.index_metadata.checkpoint_start_address;
1693 Address to_address = checkpoint_.log_metadata.final_address;
1694
1695 uint32_t start_page = from_address.page();
1696 uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page();
1697 uint32_t capacity = hlog.buffer_size();
1698 RecoveryStatus recovery_status{ start_page, end_page };
1699 // Initially issue read request for all pages that can be held in memory
1700 uint32_t total_pages_to_read = end_page - start_page;
1701 uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read);
1702 RETURN_NOT_OK(hlog.AsyncReadPagesFromLog(start_page, pages_to_read_first, recovery_status));
1703
1704 for(uint32_t page = start_page; page < end_page; ++page) {
1705 while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) {
1706 disk.TryComplete();
1707 std::this_thread::sleep_for(10ms);
1708 }
1709
1710 // handle start and end at non-page boundaries
1711 RETURN_NOT_OK(RecoverFromPage(page == start_page ? from_address : Address{ page, 0 },
1712 page + 1 == end_page ? to_address :
1713 Address{ page, Address::kMaxOffset }));
1714
1715 // OS thread flushes current page and issues a read request if necessary
1716 if(page + capacity < end_page) {
1717 Context context{ hlog, page + capacity, recovery_status };
1718 RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context));
1719 } else {
1720 RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr));
1721 }
1722 }
1723 // Wait until all pages have been flushed
1724 for(uint32_t page = start_page; page < end_page; ++page) {
1725 while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) {
1726 disk.TryComplete();
1727 std::this_thread::sleep_for(10ms);
1728 }
1729 }
1730 return Status::Ok;
1731}
1732
1733template <class K, class V, class D>
1734Status FasterKv<K, V, D>::RecoverHybridLogFromSnapshotFile() {
1735 class Context : public IAsyncContext {
1736 public:
1737 Context(hlog_t& hlog_, file_t& file_, uint32_t file_start_page_, uint32_t page_,
1738 RecoveryStatus& recovery_status_)
1739 : hlog{ &hlog_ }
1740 , file{ &file_ }
1741 , file_start_page{ file_start_page_ }
1742 , page{ page_ }
1743 , recovery_status{ &recovery_status_ } {
1744 }
1745 /// The deep-copy constructor
1746 Context(const Context& other)
1747 : hlog{ other.hlog }
1748 , file{ other.file }
1749 , file_start_page{ other.file_start_page }
1750 , page{ other.page }
1751 , recovery_status{ other.recovery_status } {
1752 }
1753 protected:
1754 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
1755 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1756 }
1757 public:
1758 hlog_t* hlog;
1759 file_t* file;
1760 uint32_t file_start_page;
1761 uint32_t page;
1762 RecoveryStatus* recovery_status;
1763 };
1764
1765 auto callback = [](IAsyncContext* ctxt, Status result) {
1766 CallbackContext<Context> context{ ctxt };
1767 result = context->hlog->AsyncReadPagesFromSnapshot(*context->file,
1768 context->file_start_page, context->page, 1, *context->recovery_status);
1769 };
1770
1771 Address file_start_address = checkpoint_.log_metadata.flushed_address;
1772 Address from_address = checkpoint_.index_metadata.checkpoint_start_address;
1773 Address to_address = checkpoint_.log_metadata.final_address;
1774
1775 uint32_t start_page = file_start_address.page();
1776 uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page();
1777 uint32_t capacity = hlog.buffer_size();
1778 RecoveryStatus recovery_status{ start_page, end_page };
1779 checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path(
1780 checkpoint_.hybrid_log_token) + "snapshot.dat");
1781 RETURN_NOT_OK(checkpoint_.snapshot_file.Open(&disk.handler()));
1782
1783 // Initially issue read request for all pages that can be held in memory
1784 uint32_t total_pages_to_read = end_page - start_page;
1785 uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read);
1786 RETURN_NOT_OK(hlog.AsyncReadPagesFromSnapshot(checkpoint_.snapshot_file, start_page, start_page,
1787 pages_to_read_first, recovery_status));
1788
1789 for(uint32_t page = start_page; page < end_page; ++page) {
1790 while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) {
1791 disk.TryComplete();
1792 std::this_thread::sleep_for(10ms);
1793 }
1794
1795 // Perform recovery if page in fuzzy portion of the log
1796 if(Address{ page + 1, 0 } > from_address) {
1797 // handle start and end at non-page boundaries
1798 RETURN_NOT_OK(RecoverFromPage(page == from_address.page() ? from_address :
1799 Address{ page, 0 },
1800 page + 1 == end_page ? to_address :
1801 Address{ page, Address::kMaxOffset }));
1802 }
1803
1804 // OS thread flushes current page and issues a read request if necessary
1805 if(page + capacity < end_page) {
1806 Context context{ hlog, checkpoint_.snapshot_file, start_page, page + capacity,
1807 recovery_status };
1808 RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context));
1809 } else {
1810 RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr));
1811 }
1812 }
1813 // Wait until all pages have been flushed
1814 for(uint32_t page = start_page; page < end_page; ++page) {
1815 while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) {
1816 disk.TryComplete();
1817 std::this_thread::sleep_for(10ms);
1818 }
1819 }
1820 return Status::Ok;
1821}
1822
1823template <class K, class V, class D>
1824Status FasterKv<K, V, D>::RecoverFromPage(Address from_address, Address to_address) {
1825 assert(from_address.page() == to_address.page());
1826 for(Address address = from_address; address < to_address;) {
1827 record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
1828 if(record->header.IsNull()) {
1829 address += sizeof(record->header);
1830 continue;
1831 }
1832 if(record->header.invalid) {
1833 address += record->size();
1834 continue;
1835 }
1836 const key_t& key = record->key();
1837 KeyHash hash = key.GetHash();
1838 HashBucketEntry expected_entry;
1839 HashBucket* bucket;
1840 AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket);
1841
1842 if(record->header.checkpoint_version <= checkpoint_.log_metadata.version) {
1843 HashBucketEntry new_entry{ address, hash.tag(), false };
1844 atomic_entry->store(new_entry);
1845 } else {
1846 record->header.invalid = true;
1847 if(record->header.previous_address() < checkpoint_.index_metadata.checkpoint_start_address) {
1848 HashBucketEntry new_entry{ record->header.previous_address(), hash.tag(), false };
1849 atomic_entry->store(new_entry);
1850 }
1851 }
1852 address += record->size();
1853 }
1854
1855 return Status::Ok;
1856}
1857
1858template <class K, class V, class D>
1859Status FasterKv<K, V, D>::RestoreHybridLog() {
1860 Address tail_address = checkpoint_.log_metadata.final_address;
1861 uint32_t end_page = tail_address.offset() > 0 ? tail_address.page() + 1 : tail_address.page();
1862 uint32_t capacity = hlog.buffer_size();
1863 // Restore as much of the log as will fit in memory.
1864 uint32_t start_page;
1865 if(end_page < capacity - hlog.kNumHeadPages) {
1866 start_page = 0;
1867 } else {
1868 start_page = end_page - (capacity - hlog.kNumHeadPages);
1869 }
1870 RecoveryStatus recovery_status{ start_page, end_page };
1871
1872 uint32_t num_pages = end_page - start_page;
1873 RETURN_NOT_OK(hlog.AsyncReadPagesFromLog(start_page, num_pages, recovery_status));
1874
1875 // Wait until all pages have been read.
1876 for(uint32_t page = start_page; page < end_page; ++page) {
1877 while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) {
1878 disk.TryComplete();
1879 std::this_thread::sleep_for(10ms);
1880 }
1881 }
1882 // Skip the null page.
1883 Address head_address = start_page == 0 ? Address{ 0, Constants::kCacheLineBytes } :
1884 Address{ start_page, 0 };
1885 hlog.RecoveryReset(checkpoint_.index_metadata.log_begin_address, head_address, tail_address);
1886 return Status::Ok;
1887}
1888
1889template <class K, class V, class D>
1890void FasterKv<K, V, D>::HeavyEnter() {
1891 if(thread_ctx().phase == Phase::GC_IO_PENDING || thread_ctx().phase == Phase::GC_IN_PROGRESS) {
1892 CleanHashTableBuckets();
1893 return;
1894 }
1895 while(thread_ctx().phase == Phase::GROW_PREPARE) {
1896 // We spin-wait as a simplification
1897 // Could instead do a "heavy operation" here
1898 std::this_thread::yield();
1899 Refresh();
1900 }
1901 if(thread_ctx().phase == Phase::GROW_IN_PROGRESS) {
1902 SplitHashTableBuckets();
1903 }
1904}
1905
1906template <class K, class V, class D>
1907bool FasterKv<K, V, D>::CleanHashTableBuckets() {
1908 uint64_t chunk = gc_.next_chunk++;
1909 if(chunk >= gc_.num_chunks) {
1910 // No chunk left to clean.
1911 return false;
1912 }
1913 uint8_t version = resize_info_.version;
1914 Address begin_address = hlog.begin_address.load();
1915 uint64_t upper_bound;
1916 if(chunk + 1 < grow_.num_chunks) {
1917 // All chunks but the last chunk contain kGrowHashTableChunkSize elements.
1918 upper_bound = kGrowHashTableChunkSize;
1919 } else {
1920 // Last chunk might contain more or fewer elements.
1921 upper_bound = state_[version].size() - (chunk * kGcHashTableChunkSize);
1922 }
1923 for(uint64_t idx = 0; idx < upper_bound; ++idx) {
1924 HashBucket* bucket = &state_[version].bucket(chunk * kGcHashTableChunkSize + idx);
1925 while(true) {
1926 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
1927 AtomicHashBucketEntry& atomic_entry = bucket->entries[entry_idx];
1928 HashBucketEntry expected_entry = atomic_entry.load();
1929 if(!expected_entry.unused() && expected_entry.address() != Address::kInvalidAddress &&
1930 expected_entry.address() < begin_address) {
1931 // The record that this entry points to was truncated; try to delete the entry.
1932 atomic_entry.compare_exchange_strong(expected_entry, HashBucketEntry::kInvalidEntry);
1933 // If deletion failed, then some other thread must have added a new record to the entry.
1934 }
1935 }
1936 // Go to next bucket in the chain.
1937 HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load();
1938 if(overflow_entry.unused()) {
1939 // No more buckets in the chain.
1940 break;
1941 }
1942 bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address());
1943 }
1944 }
1945 // Done with this chunk--did some work.
1946 return true;
1947}
1948
1949template <class K, class V, class D>
1950void FasterKv<K, V, D>::AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version,
1951 HashBucketEntry entry) {
1952 if(next_idx == HashBucket::kNumEntries) {
1953 // Need to allocate a new bucket, first.
1954 FixedPageAddress new_bucket_addr = overflow_buckets_allocator_[version].Allocate();
1955 HashBucketOverflowEntry new_bucket_entry{ new_bucket_addr };
1956 bucket->overflow_entry.store(new_bucket_entry);
1957 bucket = &overflow_buckets_allocator_[version].Get(new_bucket_addr);
1958 next_idx = 0;
1959 }
1960 bucket->entries[next_idx].store(entry);
1961 ++next_idx;
1962}
1963
1964template <class K, class V, class D>
1965Address FasterKv<K, V, D>::TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size,
1966 Address from_address, Address min_address, uint8_t side) {
1967 assert(side == 0 || side == 1);
1968 // Search back as far as min_address.
1969 while(from_address >= min_address) {
1970 const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address));
1971 KeyHash hash = record->key().GetHash();
1972 if((hash.idx(new_size) < old_size) != (side == 0)) {
1973 // Record's key hashes to the other side.
1974 return from_address;
1975 }
1976 from_address = record->header.previous_address();
1977 }
1978 return from_address;
1979}
1980
1981template <class K, class V, class D>
1982void FasterKv<K, V, D>::SplitHashTableBuckets() {
1983 // This thread won't exit until all hash table buckets have been split.
1984 Address head_address = hlog.head_address.load();
1985 Address begin_address = hlog.begin_address.load();
1986 for(uint64_t chunk = grow_.next_chunk++; chunk < grow_.num_chunks; chunk = grow_.next_chunk++) {
1987 uint64_t old_size = state_[grow_.old_version].size();
1988 uint64_t new_size = state_[grow_.new_version].size();
1989 assert(new_size == old_size * 2);
1990 // Split this chunk.
1991 uint64_t upper_bound;
1992 if(chunk + 1 < grow_.num_chunks) {
1993 // All chunks but the last chunk contain kGrowHashTableChunkSize elements.
1994 upper_bound = kGrowHashTableChunkSize;
1995 } else {
1996 // Last chunk might contain more or fewer elements.
1997 upper_bound = old_size - (chunk * kGrowHashTableChunkSize);
1998 }
1999 for(uint64_t idx = 0; idx < upper_bound; ++idx) {
2000
2001 // Split this (chain of) bucket(s).
2002 HashBucket* old_bucket = &state_[grow_.old_version].bucket(
2003 chunk * kGrowHashTableChunkSize + idx);
2004 HashBucket* new_bucket0 = &state_[grow_.new_version].bucket(
2005 chunk * kGrowHashTableChunkSize + idx);
2006 HashBucket* new_bucket1 = &state_[grow_.new_version].bucket(
2007 old_size + chunk * kGrowHashTableChunkSize + idx);
2008 uint32_t new_entry_idx0 = 0;
2009 uint32_t new_entry_idx1 = 0;
2010 while(true) {
2011 for(uint32_t old_entry_idx = 0; old_entry_idx < HashBucket::kNumEntries; ++old_entry_idx) {
2012 HashBucketEntry old_entry = old_bucket->entries[old_entry_idx].load();
2013 if(old_entry.unused()) {
2014 // Nothing to do.
2015 continue;
2016 } else if(old_entry.address() < head_address) {
2017 // Can't tell which new bucket the entry should go into; put it in both.
2018 AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
2019 AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
2020 continue;
2021 }
2022
2023 const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(
2024 old_entry.address()));
2025 KeyHash hash = record->key().GetHash();
2026 if(hash.idx(new_size) < old_size) {
2027 // Record's key hashes to the 0 side of the new hash table.
2028 AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
2029 Address other_address = TraceBackForOtherChainStart(old_size, new_size,
2030 record->header.previous_address(), head_address, 0);
2031 if(other_address >= begin_address) {
2032 // We found a record that either is on disk or has a key that hashes to the 1 side of
2033 // the new hash table.
2034 AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version,
2035 HashBucketEntry{ other_address, old_entry.tag(), false });
2036 }
2037 } else {
2038 // Record's key hashes to the 1 side of the new hash table.
2039 AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
2040 Address other_address = TraceBackForOtherChainStart(old_size, new_size,
2041 record->header.previous_address(), head_address, 1);
2042 if(other_address >= begin_address) {
2043 // We found a record that either is on disk or has a key that hashes to the 0 side of
2044 // the new hash table.
2045 AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version,
2046 HashBucketEntry{ other_address, old_entry.tag(), false });
2047 }
2048 }
2049 }
2050 // Go to next bucket in the chain.
2051 HashBucketOverflowEntry overflow_entry = old_bucket->overflow_entry.load();
2052 if(overflow_entry.unused()) {
2053 // No more buckets in the chain.
2054 break;
2055 }
2056 old_bucket = &overflow_buckets_allocator_[grow_.old_version].Get(overflow_entry.address());
2057 }
2058 }
2059 // Done with this chunk.
2060 if(--grow_.num_pending_chunks == 0) {
2061 // Free the old hash table.
2062 state_[grow_.old_version].Uninitialize();
2063 overflow_buckets_allocator_[grow_.old_version].Uninitialize();
2064 break;
2065 }
2066 }
2067 // Thread has finished growing its part of the hash table.
2068 thread_ctx().phase = Phase::REST;
2069 // Thread ack that it has finished growing the hash table.
2070 if(epoch_.FinishThreadPhase(Phase::GROW_IN_PROGRESS)) {
2071 // Let other threads know that they can use the new hash table now.
2072 GlobalMoveToNextState(SystemState{ Action::GrowIndex, Phase::GROW_IN_PROGRESS,
2073 thread_ctx().version });
2074 } else {
2075 while(system_state_.load().phase == Phase::GROW_IN_PROGRESS) {
2076 // Spin until all other threads have finished splitting their chunks.
2077 std::this_thread::yield();
2078 }
2079 }
2080}
2081
2082template <class K, class V, class D>
2083bool FasterKv<K, V, D>::GlobalMoveToNextState(SystemState current_state) {
2084 SystemState next_state = current_state.GetNextState();
2085 if(!system_state_.compare_exchange_strong(current_state, next_state)) {
2086 return false;
2087 }
2088
2089 switch(next_state.action) {
2090 case Action::CheckpointFull:
2091 case Action::CheckpointIndex:
2092 case Action::CheckpointHybridLog:
2093 switch(next_state.phase) {
2094 case Phase::PREP_INDEX_CHKPT:
2095 // This case is handled directly inside Checkpoint[Index]().
2096 assert(false);
2097 break;
2098 case Phase::INDEX_CHKPT:
2099 assert(next_state.action != Action::CheckpointHybridLog);
2100 // Issue async request for fuzzy checkpoint
2101 assert(!checkpoint_.failed);
2102 if(CheckpointFuzzyIndex() != Status::Ok) {
2103 checkpoint_.failed = true;
2104 }
2105 break;
2106 case Phase::PREPARE:
2107 // Index checkpoint will never reach this state; and CheckpointHybridLog() will handle this
2108 // case directly.
2109 assert(next_state.action == Action::CheckpointFull);
2110 // INDEX_CHKPT -> PREPARE
2111 // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the ofb.
2112 // (Ensures that recovery won't accidentally reallocate from the ofb.)
2113 checkpoint_.index_metadata.ofb_count =
2114 overflow_buckets_allocator_[resize_info_.version].count();
2115 // Write index meta data on disk
2116 if(WriteIndexMetadata() != Status::Ok) {
2117 checkpoint_.failed = true;
2118 }
2119 if(checkpoint_.index_persistence_callback) {
2120 // Notify the host that the index checkpoint has completed.
2121 checkpoint_.index_persistence_callback(Status::Ok);
2122 }
2123 break;
2124 case Phase::IN_PROGRESS: {
2125 assert(next_state.action != Action::CheckpointIndex);
2126 // PREPARE -> IN_PROGRESS
2127 // Do nothing
2128 break;
2129 }
2130 case Phase::WAIT_PENDING:
2131 assert(next_state.action != Action::CheckpointIndex);
2132 // IN_PROGRESS -> WAIT_PENDING
2133 // Do nothing
2134 break;
2135 case Phase::WAIT_FLUSH:
2136 assert(next_state.action != Action::CheckpointIndex);
2137 // WAIT_PENDING -> WAIT_FLUSH
2138 if(fold_over_snapshot) {
2139 // Move read-only to tail
2140 Address tail_address = hlog.ShiftReadOnlyToTail();
2141 // Get final address for CPR
2142 checkpoint_.log_metadata.final_address = tail_address;
2143 } else {
2144 Address tail_address = hlog.GetTailAddress();
2145 // Get final address for CPR
2146 checkpoint_.log_metadata.final_address = tail_address;
2147 checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path(
2148 checkpoint_.hybrid_log_token) + "snapshot.dat");
2149 if(checkpoint_.snapshot_file.Open(&disk.handler()) != Status::Ok) {
2150 checkpoint_.failed = true;
2151 }
2152 // Flush the log to a snapshot.
2153 hlog.AsyncFlushPagesToFile(checkpoint_.log_metadata.flushed_address.page(),
2154 checkpoint_.log_metadata.final_address, checkpoint_.snapshot_file,
2155 checkpoint_.flush_pending);
2156 }
2157 // Write CPR meta data file
2158 if(WriteCprMetadata() != Status::Ok) {
2159 checkpoint_.failed = true;
2160 }
2161 break;
2162 case Phase::PERSISTENCE_CALLBACK:
2163 assert(next_state.action != Action::CheckpointIndex);
2164 // WAIT_FLUSH -> PERSISTENCE_CALLBACK
2165 break;
2166 case Phase::REST:
2167 // PERSISTENCE_CALLBACK -> REST or INDEX_CHKPT -> REST
2168 if(next_state.action != Action::CheckpointIndex) {
2169 // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
2170 // another checkpoint can be started.)
2171 checkpoint_.CheckpointDone();
2172 // Free checkpoint locks!
2173 checkpoint_locks_.Free();
2174 // Checkpoint is done--no more work for threads to do.
2175 system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
2176 } else {
2177 // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the
2178 // ofb. (Ensures that recovery won't accidentally reallocate from the ofb.)
2179 checkpoint_.index_metadata.ofb_count =
2180 overflow_buckets_allocator_[resize_info_.version].count();
2181 // Write index meta data on disk
2182 if(WriteIndexMetadata() != Status::Ok) {
2183 checkpoint_.failed = true;
2184 }
2185 auto index_persistence_callback = checkpoint_.index_persistence_callback;
2186 // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
2187 // another checkpoint can be started.)
2188 checkpoint_.CheckpointDone();
2189 // Checkpoint is done--no more work for threads to do.
2190 system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
2191 if(index_persistence_callback) {
2192 // Notify the host that the index checkpoint has completed.
2193 index_persistence_callback(Status::Ok);
2194 }
2195 }
2196 break;
2197 default:
2198 // not reached
2199 assert(false);
2200 break;
2201 }
2202 break;
2203 case Action::GC:
2204 switch(next_state.phase) {
2205 case Phase::GC_IO_PENDING:
2206 // This case is handled directly inside ShiftBeginAddress().
2207 assert(false);
2208 break;
2209 case Phase::GC_IN_PROGRESS:
2210 // GC_IO_PENDING -> GC_IN_PROGRESS
2211 // Tell the disk to truncate the log.
2212 hlog.Truncate(gc_.truncate_callback);
2213 break;
2214 case Phase::REST:
2215 // GC_IN_PROGRESS -> REST
2216 // GC is done--no more work for threads to do.
2217 if(gc_.complete_callback) {
2218 gc_.complete_callback();
2219 }
2220 system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
2221 break;
2222 default:
2223 // not reached
2224 assert(false);
2225 break;
2226 }
2227 break;
2228 case Action::GrowIndex:
2229 switch(next_state.phase) {
2230 case Phase::GROW_PREPARE:
2231 // This case is handled directly inside GrowIndex().
2232 assert(false);
2233 break;
2234 case Phase::GROW_IN_PROGRESS:
2235 // Swap hash table versions so that all threads will use the new version after populating it.
2236 resize_info_.version = grow_.new_version;
2237 break;
2238 case Phase::REST:
2239 if(grow_.callback) {
2240 grow_.callback(state_[grow_.new_version].size());
2241 }
2242 system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
2243 break;
2244 default:
2245 // not reached
2246 assert(false);
2247 break;
2248 }
2249 break;
2250 default:
2251 // not reached
2252 assert(false);
2253 break;
2254 }
2255 return true;
2256}
2257
2258template <class K, class V, class D>
2259void FasterKv<K, V, D>::MarkAllPendingRequests() {
2260 uint32_t table_version = resize_info_.version;
2261 uint64_t table_size = state_[table_version].size();
2262
2263 for(const IAsyncContext* ctxt : thread_ctx().retry_requests) {
2264 const pending_context_t* context = static_cast<const pending_context_t*>(ctxt);
2265 // We will succeed, since no other thread can currently advance the entry's version, since this
2266 // thread hasn't acked "PENDING" phase completion yet.
2267 bool result = checkpoint_locks_.get_lock(context->key().GetHash()).try_lock_old();
2268 assert(result);
2269 }
2270 for(const auto& pending_io : thread_ctx().pending_ios) {
2271 // We will succeed, since no other thread can currently advance the entry's version, since this
2272 // thread hasn't acked "PENDING" phase completion yet.
2273 bool result = checkpoint_locks_.get_lock(pending_io.second).try_lock_old();
2274 assert(result);
2275 }
2276}
2277
2278template <class K, class V, class D>
2279void FasterKv<K, V, D>::HandleSpecialPhases() {
2280 SystemState final_state = system_state_.load();
2281 if(final_state.phase == Phase::REST) {
2282 // Nothing to do; just reset thread context.
2283 thread_ctx().phase = Phase::REST;
2284 thread_ctx().version = final_state.version;
2285 return;
2286 }
2287 SystemState previous_state{ final_state.action, thread_ctx().phase, thread_ctx().version };
2288 do {
2289 // Identify the transition (currentState -> nextState)
2290 SystemState current_state = (previous_state == final_state) ? final_state :
2291 previous_state.GetNextState();
2292 switch(current_state.action) {
2293 case Action::CheckpointFull:
2294 case Action::CheckpointIndex:
2295 case Action::CheckpointHybridLog:
2296 switch(current_state.phase) {
2297 case Phase::PREP_INDEX_CHKPT:
2298 assert(current_state.action != Action::CheckpointHybridLog);
2299 // Both from REST -> PREP_INDEX_CHKPT and PREP_INDEX_CHKPT -> PREP_INDEX_CHKPT
2300 if(previous_state.phase == Phase::REST) {
2301 // Thread ack that we're performing a checkpoint.
2302 if(epoch_.FinishThreadPhase(Phase::PREP_INDEX_CHKPT)) {
2303 GlobalMoveToNextState(current_state);
2304 }
2305 }
2306 break;
2307 case Phase::INDEX_CHKPT: {
2308 assert(current_state.action != Action::CheckpointHybridLog);
2309 // Both from PREP_INDEX_CHKPT -> INDEX_CHKPT and INDEX_CHKPT -> INDEX_CHKPT
2310 Status result = CheckpointFuzzyIndexComplete();
2311 if(result != Status::Pending && result != Status::Ok) {
2312 checkpoint_.failed = true;
2313 }
2314 if(result != Status::Pending) {
2315 if(current_state.action == Action::CheckpointIndex) {
2316 // This thread is done now.
2317 thread_ctx().phase = Phase::REST;
2318 // Thread ack that it is done.
2319 if(epoch_.FinishThreadPhase(Phase::INDEX_CHKPT)) {
2320 GlobalMoveToNextState(current_state);
2321 }
2322 } else {
2323 // Index checkpoint is done; move on to PREPARE phase.
2324 GlobalMoveToNextState(current_state);
2325 }
2326 }
2327 break;
2328 }
2329 case Phase::PREPARE:
2330 assert(current_state.action != Action::CheckpointIndex);
2331 // Handle (INDEX_CHKPT -> PREPARE or REST -> PREPARE) and PREPARE -> PREPARE
2332 if(previous_state.phase != Phase::PREPARE) {
2333 // mark pending requests
2334 MarkAllPendingRequests();
2335 // keep a count of number of threads
2336 ++checkpoint_.log_metadata.num_threads;
2337 // set the thread index
2338 checkpoint_.log_metadata.guids[Thread::id()] = thread_ctx().guid;
2339 // Thread ack that it has finished marking its pending requests.
2340 if(epoch_.FinishThreadPhase(Phase::PREPARE)) {
2341 GlobalMoveToNextState(current_state);
2342 }
2343 }
2344 break;
2345 case Phase::IN_PROGRESS:
2346 assert(current_state.action != Action::CheckpointIndex);
2347 // Handle PREPARE -> IN_PROGRESS and IN_PROGRESS -> IN_PROGRESS
2348 if(previous_state.phase == Phase::PREPARE) {
2349 assert(prev_thread_ctx().retry_requests.empty());
2350 assert(prev_thread_ctx().pending_ios.empty());
2351 assert(prev_thread_ctx().io_responses.empty());
2352
2353 // Get a new thread context; keep track of the old one as "previous."
2354 thread_contexts_[Thread::id()].swap();
2355 // initialize a new local context
2356 thread_ctx().Initialize(Phase::IN_PROGRESS, current_state.version,
2357 prev_thread_ctx().guid, prev_thread_ctx().serial_num);
2358 // Thread ack that it has swapped contexts.
2359 if(epoch_.FinishThreadPhase(Phase::IN_PROGRESS)) {
2360 GlobalMoveToNextState(current_state);
2361 }
2362 }
2363 break;
2364 case Phase::WAIT_PENDING:
2365 assert(current_state.action != Action::CheckpointIndex);
2366 // Handle IN_PROGRESS -> WAIT_PENDING and WAIT_PENDING -> WAIT_PENDING
2367 if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_PENDING)) {
2368 if(prev_thread_ctx().pending_ios.empty() &&
2369 prev_thread_ctx().retry_requests.empty()) {
2370 // Thread ack that it has completed its pending I/Os.
2371 if(epoch_.FinishThreadPhase(Phase::WAIT_PENDING)) {
2372 GlobalMoveToNextState(current_state);
2373 }
2374 }
2375 }
2376 break;
2377 case Phase::WAIT_FLUSH:
2378 assert(current_state.action != Action::CheckpointIndex);
2379 // Handle WAIT_PENDING -> WAIT_FLUSH and WAIT_FLUSH -> WAIT_FLUSH
2380 if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_FLUSH)) {
2381 bool flushed;
2382 if(fold_over_snapshot) {
2383 flushed = hlog.flushed_until_address.load() >= checkpoint_.log_metadata.final_address;
2384 } else {
2385 flushed = checkpoint_.flush_pending.load() == 0;
2386 }
2387 if(flushed) {
2388 // write context info
2389 WriteCprContext();
2390 // Thread ack that it has written its CPU context.
2391 if(epoch_.FinishThreadPhase(Phase::WAIT_FLUSH)) {
2392 GlobalMoveToNextState(current_state);
2393 }
2394 }
2395 }
2396 break;
2397 case Phase::PERSISTENCE_CALLBACK:
2398 assert(current_state.action != Action::CheckpointIndex);
2399 // Handle WAIT_FLUSH -> PERSISTENCE_CALLBACK and PERSISTENCE_CALLBACK -> PERSISTENCE_CALLBACK
2400 if(previous_state.phase == Phase::WAIT_FLUSH) {
2401 // Persistence callback
2402 if(checkpoint_.hybrid_log_persistence_callback) {
2403 checkpoint_.hybrid_log_persistence_callback(Status::Ok, prev_thread_ctx().serial_num);
2404 }
2405 // Thread has finished checkpointing.
2406 thread_ctx().phase = Phase::REST;
2407 // Thread ack that it has finished checkpointing.
2408 if(epoch_.FinishThreadPhase(Phase::PERSISTENCE_CALLBACK)) {
2409 GlobalMoveToNextState(current_state);
2410 }
2411 }
2412 break;
2413 default:
2414 // nothing to do.
2415 break;
2416 }
2417 break;
2418 case Action::GC:
2419 switch(current_state.phase) {
2420 case Phase::GC_IO_PENDING:
2421 // Handle REST -> GC_IO_PENDING and GC_IO_PENDING -> GC_IO_PENDING.
2422 if(previous_state.phase == Phase::REST) {
2423 assert(prev_thread_ctx().retry_requests.empty());
2424 assert(prev_thread_ctx().pending_ios.empty());
2425 assert(prev_thread_ctx().io_responses.empty());
2426 // Get a new thread context; keep track of the old one as "previous."
2427 thread_contexts_[Thread::id()].swap();
2428 // initialize a new local context
2429 thread_ctx().Initialize(Phase::GC_IO_PENDING, current_state.version,
2430 prev_thread_ctx().guid, prev_thread_ctx().serial_num);
2431 }
2432
2433 // See if the old thread context has completed its pending I/Os.
2434 if(!epoch_.HasThreadFinishedPhase(Phase::GC_IO_PENDING)) {
2435 if(prev_thread_ctx().pending_ios.empty() &&
2436 prev_thread_ctx().retry_requests.empty()) {
2437 // Thread ack that it has completed its pending I/Os.
2438 if(epoch_.FinishThreadPhase(Phase::GC_IO_PENDING)) {
2439 GlobalMoveToNextState(current_state);
2440 }
2441 }
2442 }
2443 break;
2444 case Phase::GC_IN_PROGRESS:
2445 // Handle GC_IO_PENDING -> GC_IN_PROGRESS and GC_IN_PROGRESS -> GC_IN_PROGRESS.
2446 if(!epoch_.HasThreadFinishedPhase(Phase::GC_IN_PROGRESS)) {
2447 if(!CleanHashTableBuckets()) {
2448 // No more buckets for this thread to clean; thread has finished GC.
2449 thread_ctx().phase = Phase::REST;
2450 // Thread ack that it has finished GC.
2451 if(epoch_.FinishThreadPhase(Phase::GC_IN_PROGRESS)) {
2452 GlobalMoveToNextState(current_state);
2453 }
2454 }
2455 }
2456 break;
2457 default:
2458 assert(false); // not reached
2459 break;
2460 }
2461 break;
2462 case Action::GrowIndex:
2463 switch(current_state.phase) {
2464 case Phase::GROW_PREPARE:
2465 if(previous_state.phase == Phase::REST) {
2466 // Thread ack that we're going to grow the hash table.
2467 if(epoch_.FinishThreadPhase(Phase::GROW_PREPARE)) {
2468 GlobalMoveToNextState(current_state);
2469 }
2470 } else {
2471 // Wait for all other threads to finish their outstanding (synchronous) hash table
2472 // operations.
2473 std::this_thread::yield();
2474 }
2475 break;
2476 case Phase::GROW_IN_PROGRESS:
2477 SplitHashTableBuckets();
2478 break;
2479 }
2480 break;
2481 }
2482 thread_ctx().phase = current_state.phase;
2483 thread_ctx().version = current_state.version;
2484 previous_state = current_state;
2485 } while(previous_state != final_state);
2486}
2487
2488template <class K, class V, class D>
2489bool FasterKv<K, V, D>::Checkpoint(void(*index_persistence_callback)(Status result),
2490 void(*hybrid_log_persistence_callback)(Status result,
2491 uint64_t persistent_serial_num), Guid& token) {
2492 // Only one thread can initiate a checkpoint at a time.
2493 SystemState expected{ Action::None, Phase::REST, system_state_.load().version };
2494 SystemState desired{ Action::CheckpointFull, Phase::REST, expected.version };
2495 if(!system_state_.compare_exchange_strong(expected, desired)) {
2496 // Can't start a new checkpoint while a checkpoint or recovery is already in progress.
2497 return false;
2498 }
2499 // We are going to start a checkpoint.
2500 epoch_.ResetPhaseFinished();
2501 // Initialize all contexts
2502 token = Guid::Create();
2503 disk.CreateIndexCheckpointDirectory(token);
2504 disk.CreateCprCheckpointDirectory(token);
2505 // Obtain tail address for fuzzy index checkpoint
2506 if(!fold_over_snapshot) {
2507 checkpoint_.InitializeCheckpoint(token, desired.version, state_[resize_info_.version].size(),
2508 hlog.begin_address.load(), hlog.GetTailAddress(), true,
2509 hlog.flushed_until_address.load(),
2510 index_persistence_callback,
2511 hybrid_log_persistence_callback);
2512 } else {
2513 checkpoint_.InitializeCheckpoint(token, desired.version, state_[resize_info_.version].size(),
2514 hlog.begin_address.load(), hlog.GetTailAddress(), false,
2515 Address::kInvalidAddress, index_persistence_callback,
2516 hybrid_log_persistence_callback);
2517
2518 }
2519 InitializeCheckpointLocks();
2520 // Let other threads know that the checkpoint has started.
2521 system_state_.store(desired.GetNextState());
2522 return true;
2523}
2524
2525template <class K, class V, class D>
2526bool FasterKv<K, V, D>::CheckpointIndex(void(*index_persistence_callback)(Status result),
2527 Guid& token) {
2528 // Only one thread can initiate a checkpoint at a time.
2529 SystemState expected{ Action::None, Phase::REST, system_state_.load().version };
2530 SystemState desired{ Action::CheckpointIndex, Phase::REST, expected.version };
2531 if(!system_state_.compare_exchange_strong(expected, desired)) {
2532 // Can't start a new checkpoint while a checkpoint or recovery is already in progress.
2533 return false;
2534 }
2535 // We are going to start a checkpoint.
2536 epoch_.ResetPhaseFinished();
2537 // Initialize all contexts
2538 token = Guid::Create();
2539 disk.CreateIndexCheckpointDirectory(token);
2540 checkpoint_.InitializeIndexCheckpoint(token, desired.version,
2541 state_[resize_info_.version].size(),
2542 hlog.begin_address.load(), hlog.GetTailAddress(),
2543 index_persistence_callback);
2544 // Let other threads know that the checkpoint has started.
2545 system_state_.store(desired.GetNextState());
2546 return true;
2547}
2548
2549template <class K, class V, class D>
2550bool FasterKv<K, V, D>::CheckpointHybridLog(void(*hybrid_log_persistence_callback)(Status result,
2551 uint64_t persistent_serial_num), Guid& token) {
2552 // Only one thread can initiate a checkpoint at a time.
2553 SystemState expected{ Action::None, Phase::REST, system_state_.load().version };
2554 SystemState desired{ Action::CheckpointHybridLog, Phase::REST, expected.version };
2555 if(!system_state_.compare_exchange_strong(expected, desired)) {
2556 // Can't start a new checkpoint while a checkpoint or recovery is already in progress.
2557 return false;
2558 }
2559 // We are going to start a checkpoint.
2560 epoch_.ResetPhaseFinished();
2561 // Initialize all contexts
2562 token = Guid::Create();
2563 disk.CreateCprCheckpointDirectory(token);
2564 // Obtain tail address for fuzzy index checkpoint
2565 if(!fold_over_snapshot) {
2566 checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, true,
2567 hlog.flushed_until_address.load(), hybrid_log_persistence_callback);
2568 } else {
2569 checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, false,
2570 Address::kInvalidAddress, hybrid_log_persistence_callback);
2571 }
2572 InitializeCheckpointLocks();
2573 // Let other threads know that the checkpoint has started.
2574 system_state_.store(desired.GetNextState());
2575 return true;
2576}
2577
2578template <class K, class V, class D>
2579Status FasterKv<K, V, D>::Recover(const Guid& index_token, const Guid& hybrid_log_token,
2580 uint32_t& version,
2581 std::vector<Guid>& session_ids) {
2582 version = 0;
2583 session_ids.clear();
2584 SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version };
2585 if(!system_state_.compare_exchange_strong(expected,
2586 SystemState{ Action::Recover, Phase::REST, expected.version })) {
2587 return Status::Aborted;
2588 }
2589 checkpoint_.InitializeRecover(index_token, hybrid_log_token);
2590 Status status;
2591#define BREAK_NOT_OK(s) \
2592 status = (s); \
2593 if (status != Status::Ok) break
2594
2595 do {
2596 // Index and log metadata.
2597 BREAK_NOT_OK(ReadIndexMetadata(index_token));
2598 BREAK_NOT_OK(ReadCprMetadata(hybrid_log_token));
2599 if(checkpoint_.index_metadata.version != checkpoint_.log_metadata.version) {
2600 // Index and hybrid-log checkpoints should have the same version.
2601 status = Status::Corruption;
2602 break;
2603 }
2604
2605 system_state_.store(SystemState{ Action::Recover, Phase::REST,
2606 checkpoint_.log_metadata.version + 1 });
2607
2608 BREAK_NOT_OK(ReadCprContexts(hybrid_log_token, checkpoint_.log_metadata.guids));
2609 // The index itself (including overflow buckets).
2610 BREAK_NOT_OK(RecoverFuzzyIndex());
2611 BREAK_NOT_OK(RecoverFuzzyIndexComplete(true));
2612 // Any changes made to the log while the index was being fuzzy-checkpointed.
2613 if(fold_over_snapshot) {
2614 BREAK_NOT_OK(RecoverHybridLog());
2615 } else {
2616 BREAK_NOT_OK(RecoverHybridLogFromSnapshotFile());
2617 }
2618 BREAK_NOT_OK(RestoreHybridLog());
2619 } while(false);
2620 if(status == Status::Ok) {
2621 for(const auto& token : checkpoint_.continue_tokens) {
2622 session_ids.push_back(token.first);
2623 }
2624 version = checkpoint_.log_metadata.version;
2625 }
2626 checkpoint_.RecoverDone();
2627 system_state_.store(SystemState{ Action::None, Phase::REST,
2628 checkpoint_.log_metadata.version + 1 });
2629 return status;
2630#undef BREAK_NOT_OK
2631}
2632
2633template <class K, class V, class D>
2634bool FasterKv<K, V, D>::ShiftBeginAddress(Address address,
2635 GcState::truncate_callback_t truncate_callback,
2636 GcState::complete_callback_t complete_callback) {
2637 SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version };
2638 if(!system_state_.compare_exchange_strong(expected,
2639 SystemState{ Action::GC, Phase::REST, expected.version })) {
2640 // Can't start a GC while an action is already in progress.
2641 return false;
2642 }
2643 hlog.begin_address.store(address);
2644 // Each active thread will notify the epoch when all pending I/Os have completed.
2645 epoch_.ResetPhaseFinished();
2646 uint64_t num_chunks = std::max(state_[resize_info_.version].size() / kGcHashTableChunkSize,
2647 (uint64_t)1);
2648 gc_.Initialize(truncate_callback, complete_callback, num_chunks);
2649 // Let other threads know to complete their pending I/Os, so that the log can be truncated.
2650 system_state_.store(SystemState{ Action::GC, Phase::GC_IO_PENDING, expected.version });
2651 return true;
2652}
2653
2654template <class K, class V, class D>
2655bool FasterKv<K, V, D>::GrowIndex(GrowState::callback_t caller_callback) {
2656 SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version };
2657 if(!system_state_.compare_exchange_strong(expected,
2658 SystemState{ Action::GrowIndex, Phase::REST, expected.version })) {
2659 // An action is already in progress.
2660 return false;
2661 }
2662 epoch_.ResetPhaseFinished();
2663 uint8_t current_version = resize_info_.version;
2664 assert(current_version == 0 || current_version == 1);
2665 uint8_t next_version = 1 - current_version;
2666 uint64_t num_chunks = std::max(state_[current_version].size() / kGrowHashTableChunkSize,
2667 (uint64_t)1);
2668 grow_.Initialize(caller_callback, current_version, num_chunks);
2669 // Initialize the next version of our hash table to be twice the size of the current version.
2670 state_[next_version].Initialize(state_[current_version].size() * 2, disk.log().alignment());
2671 overflow_buckets_allocator_[next_version].Initialize(disk.log().alignment(), epoch_);
2672
2673 SystemState next = SystemState{ Action::GrowIndex, Phase::GROW_PREPARE, expected.version };
2674 system_state_.store(next);
2675
2676 // Let this thread know it should be growing the index.
2677 Refresh();
2678 return true;
2679}
2680
2681}
2682} // namespace FASTER::core