1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <atomic> |
7 | #include <cassert> |
8 | #include <cinttypes> |
9 | #include <cstdint> |
10 | #include <cstdio> |
11 | #include <cstring> |
12 | #include <type_traits> |
13 | |
14 | #include "device/file_system_disk.h" |
15 | |
16 | #include "alloc.h" |
17 | #include "checkpoint_locks.h" |
18 | #include "checkpoint_state.h" |
19 | #include "constants.h" |
20 | #include "gc_state.h" |
21 | #include "grow_state.h" |
22 | #include "guid.h" |
23 | #include "hash_table.h" |
24 | #include "internal_contexts.h" |
25 | #include "key_hash.h" |
26 | #include "malloc_fixed_page_size.h" |
27 | #include "persistent_memory_malloc.h" |
28 | #include "record.h" |
29 | #include "recovery_status.h" |
30 | #include "state_transitions.h" |
31 | #include "status.h" |
32 | #include "utility.h" |
33 | |
34 | using namespace std::chrono_literals; |
35 | |
36 | /// The FASTER key-value store, and related classes. |
37 | |
38 | namespace FASTER { |
39 | namespace core { |
40 | |
41 | class alignas(Constants::kCacheLineBytes) ThreadContext { |
42 | public: |
43 | ThreadContext() |
44 | : contexts_{} |
45 | , cur_{ 0 } { |
46 | } |
47 | |
48 | inline const ExecutionContext& cur() const { |
49 | return contexts_[cur_]; |
50 | } |
51 | inline ExecutionContext& cur() { |
52 | return contexts_[cur_]; |
53 | } |
54 | |
55 | inline const ExecutionContext& prev() const { |
56 | return contexts_[(cur_ + 1) % 2]; |
57 | } |
58 | inline ExecutionContext& prev() { |
59 | return contexts_[(cur_ + 1) % 2]; |
60 | } |
61 | |
62 | inline void swap() { |
63 | cur_ = (cur_ + 1) % 2; |
64 | } |
65 | |
66 | private: |
67 | ExecutionContext contexts_[2]; |
68 | uint8_t cur_; |
69 | }; |
70 | static_assert(sizeof(ThreadContext) == 448, "sizeof(ThreadContext) != 448" ); |
71 | |
72 | /// The FASTER key-value store. |
73 | template <class K, class V, class D> |
74 | class FasterKv { |
75 | public: |
76 | typedef FasterKv<K, V, D> faster_t; |
77 | |
78 | /// Keys and values stored in this key-value store. |
79 | typedef K key_t; |
80 | typedef V value_t; |
81 | |
82 | typedef D disk_t; |
83 | typedef typename D::file_t file_t; |
84 | typedef typename D::log_file_t log_file_t; |
85 | |
86 | typedef PersistentMemoryMalloc<disk_t> hlog_t; |
87 | |
88 | /// Contexts that have been deep-copied, for async continuations, and must be accessed via |
89 | /// virtual function calls. |
90 | typedef AsyncPendingReadContext<key_t> async_pending_read_context_t; |
91 | typedef AsyncPendingUpsertContext<key_t> async_pending_upsert_context_t; |
92 | typedef AsyncPendingRmwContext<key_t> async_pending_rmw_context_t; |
93 | |
94 | FasterKv(uint64_t table_size, uint64_t log_size, const std::string& filename, |
95 | double log_mutable_fraction = 0.9) |
96 | : min_table_size_{ table_size } |
97 | , disk{ filename, epoch_ } |
98 | , hlog{ log_size, epoch_, disk, disk.log(), log_mutable_fraction } |
99 | , system_state_{ Action::None, Phase::REST, 1 } |
100 | , num_pending_ios{ 0 } { |
101 | if(!Utility::IsPowerOfTwo(table_size)) { |
102 | throw std::invalid_argument{ " Size is not a power of 2" }; |
103 | } |
104 | if(table_size > INT32_MAX) { |
105 | throw std::invalid_argument{ " Cannot allocate such a large hash table " }; |
106 | } |
107 | |
108 | resize_info_.version = 0; |
109 | state_[0].Initialize(table_size, disk.log().alignment()); |
110 | overflow_buckets_allocator_[0].Initialize(disk.log().alignment(), epoch_); |
111 | } |
112 | |
113 | // No copy constructor. |
114 | FasterKv(const FasterKv& other) = delete; |
115 | |
116 | public: |
117 | /// Thread-related operations |
118 | Guid StartSession(); |
119 | uint64_t ContinueSession(const Guid& guid); |
120 | void StopSession(); |
121 | void Refresh(); |
122 | |
123 | /// Store interface |
124 | template <class RC> |
125 | inline Status Read(RC& context, AsyncCallback callback, uint64_t monotonic_serial_num); |
126 | |
127 | template <class UC> |
128 | inline Status Upsert(UC& context, AsyncCallback callback, uint64_t monotonic_serial_num); |
129 | |
130 | template <class MC> |
131 | inline Status Rmw(MC& context, AsyncCallback callback, uint64_t monotonic_serial_num); |
132 | /// Delete() not yet implemented! |
133 | // void Delete(const Key& key, Context& context, uint64_t lsn); |
134 | inline bool CompletePending(bool wait = false); |
135 | |
136 | /// Checkpoint/recovery operations. |
137 | bool Checkpoint(void(*index_persistence_callback)(Status result), |
138 | void(*hybrid_log_persistence_callback)(Status result, |
139 | uint64_t persistent_serial_num), Guid& token); |
140 | bool CheckpointIndex(void(*index_persistence_callback)(Status result), Guid& token); |
141 | bool CheckpointHybridLog(void(*hybrid_log_persistence_callback)(Status result, |
142 | uint64_t persistent_serial_num), Guid& token); |
143 | Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version, |
144 | std::vector<Guid>& session_ids); |
145 | |
146 | /// Truncating the head of the log. |
147 | bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback, |
148 | GcState::complete_callback_t complete_callback); |
149 | |
150 | /// Make the hash table larger. |
151 | bool GrowIndex(GrowState::callback_t caller_callback); |
152 | |
153 | /// Statistics |
154 | inline uint64_t Size() const { |
155 | return hlog.GetTailAddress().control(); |
156 | } |
157 | inline void DumpDistribution() { |
158 | state_[resize_info_.version].DumpDistribution( |
159 | overflow_buckets_allocator_[resize_info_.version]); |
160 | } |
161 | |
162 | private: |
163 | typedef Record<key_t, value_t> record_t; |
164 | |
165 | typedef PendingContext<key_t> pending_context_t; |
166 | |
167 | template <class C> |
168 | inline OperationStatus InternalRead(C& pending_context) const; |
169 | |
170 | template <class C> |
171 | inline OperationStatus InternalUpsert(C& pending_context); |
172 | |
173 | template <class C> |
174 | inline OperationStatus InternalRmw(C& pending_context, bool retrying); |
175 | |
176 | inline OperationStatus InternalRetryPendingRmw(async_pending_rmw_context_t& pending_context); |
177 | |
178 | OperationStatus InternalContinuePendingRead(ExecutionContext& ctx, |
179 | AsyncIOContext& io_context); |
180 | OperationStatus InternalContinuePendingRmw(ExecutionContext& ctx, |
181 | AsyncIOContext& io_context); |
182 | |
183 | // Find the hash bucket entry, if any, corresponding to the specified hash. |
184 | inline const AtomicHashBucketEntry* FindEntry(KeyHash hash) const; |
185 | // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise, |
186 | // create a new entry. The caller can use the "expected_entry" to CAS its desired address into |
187 | // the entry. |
188 | inline AtomicHashBucketEntry* FindOrCreateEntry(KeyHash hash, HashBucketEntry& expected_entry, |
189 | HashBucket*& bucket); |
190 | inline Address TraceBackForKeyMatch(const key_t& key, Address from_address, |
191 | Address min_offset) const; |
192 | Address TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, Address from_address, |
193 | Address min_address, uint8_t side); |
194 | |
195 | // If a hash bucket entry corresponding to the specified hash exists, return it; otherwise, |
196 | // return an unused bucket entry. |
197 | inline AtomicHashBucketEntry* FindTentativeEntry(KeyHash hash, HashBucket* bucket, |
198 | uint8_t version, HashBucketEntry& expected_entry); |
199 | // Looks for an entry that has the same |
200 | inline bool HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version, |
201 | const AtomicHashBucketEntry* atomic_entry) const; |
202 | |
203 | inline Address BlockAllocate(uint32_t record_size); |
204 | |
205 | inline Status HandleOperationStatus(ExecutionContext& ctx, |
206 | pending_context_t& pending_context, |
207 | OperationStatus internal_status, bool& async); |
208 | inline Status PivotAndRetry(ExecutionContext& ctx, pending_context_t& pending_context, |
209 | bool& async); |
210 | inline Status RetryLater(ExecutionContext& ctx, pending_context_t& pending_context, |
211 | bool& async); |
212 | inline constexpr uint32_t MinIoRequestSize() const; |
213 | inline Status IssueAsyncIoRequest(ExecutionContext& ctx, pending_context_t& pending_context, |
214 | bool& async); |
215 | |
216 | void AsyncGetFromDisk(Address address, uint32_t num_records, AsyncIOCallback callback, |
217 | AsyncIOContext& context); |
218 | static void AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result, |
219 | size_t bytes_transferred); |
220 | |
221 | void CompleteIoPendingRequests(ExecutionContext& context); |
222 | void CompleteRetryRequests(ExecutionContext& context); |
223 | |
224 | void InitializeCheckpointLocks(); |
225 | |
226 | /// Checkpoint/recovery methods. |
227 | void HandleSpecialPhases(); |
228 | bool GlobalMoveToNextState(SystemState current_state); |
229 | |
230 | Status CheckpointFuzzyIndex(); |
231 | Status CheckpointFuzzyIndexComplete(); |
232 | Status RecoverFuzzyIndex(); |
233 | Status RecoverFuzzyIndexComplete(bool wait); |
234 | |
235 | Status WriteIndexMetadata(); |
236 | Status ReadIndexMetadata(const Guid& token); |
237 | Status WriteCprMetadata(); |
238 | Status ReadCprMetadata(const Guid& token); |
239 | Status WriteCprContext(); |
240 | Status ReadCprContexts(const Guid& token, const Guid* guids); |
241 | |
242 | Status RecoverHybridLog(); |
243 | Status RecoverHybridLogFromSnapshotFile(); |
244 | Status RecoverFromPage(Address from_address, Address to_address); |
245 | Status RestoreHybridLog(); |
246 | |
247 | void MarkAllPendingRequests(); |
248 | |
249 | inline void HeavyEnter(); |
250 | bool CleanHashTableBuckets(); |
251 | void SplitHashTableBuckets(); |
252 | void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version, |
253 | HashBucketEntry entry); |
254 | |
255 | /// Access the current and previous (thread-local) execution contexts. |
256 | const ExecutionContext& thread_ctx() const { |
257 | return thread_contexts_[Thread::id()].cur(); |
258 | } |
259 | ExecutionContext& thread_ctx() { |
260 | return thread_contexts_[Thread::id()].cur(); |
261 | } |
262 | ExecutionContext& prev_thread_ctx() { |
263 | return thread_contexts_[Thread::id()].prev(); |
264 | } |
265 | |
266 | private: |
267 | LightEpoch epoch_; |
268 | |
269 | public: |
270 | disk_t disk; |
271 | hlog_t hlog; |
272 | |
273 | private: |
274 | static constexpr bool kCopyReadsToTail = false; |
275 | static constexpr uint64_t kGcHashTableChunkSize = 16384; |
276 | static constexpr uint64_t kGrowHashTableChunkSize = 16384; |
277 | |
278 | bool fold_over_snapshot = true; |
279 | |
280 | /// Initial size of the table |
281 | uint64_t min_table_size_; |
282 | |
283 | // Allocator for the hash buckets that don't fit in the hash table. |
284 | MallocFixedPageSize<HashBucket, disk_t> overflow_buckets_allocator_[2]; |
285 | |
286 | // An array of size two, that contains the old and new versions of the hash-table |
287 | InternalHashTable<disk_t> state_[2]; |
288 | |
289 | CheckpointLocks checkpoint_locks_; |
290 | |
291 | ResizeInfo resize_info_; |
292 | |
293 | AtomicSystemState system_state_; |
294 | |
295 | /// Checkpoint/recovery state. |
296 | CheckpointState<file_t> checkpoint_; |
297 | /// Garbage collection state. |
298 | GcState gc_; |
299 | /// Grow (hash table) state. |
300 | GrowState grow_; |
301 | |
302 | /// Global count of pending I/Os, used for throttling. |
303 | std::atomic<uint64_t> num_pending_ios; |
304 | |
305 | /// Space for two contexts per thread, stored inline. |
306 | ThreadContext thread_contexts_[Thread::kMaxNumThreads]; |
307 | }; |
308 | |
309 | // Implementations. |
310 | template <class K, class V, class D> |
311 | inline Guid FasterKv<K, V, D>::StartSession() { |
312 | SystemState state = system_state_.load(); |
313 | if(state.phase != Phase::REST) { |
314 | throw std::runtime_error{ "Can acquire only in REST phase!" }; |
315 | } |
316 | thread_ctx().Initialize(state.phase, state.version, Guid::Create(), 0); |
317 | Refresh(); |
318 | return thread_ctx().guid; |
319 | } |
320 | |
321 | template <class K, class V, class D> |
322 | inline uint64_t FasterKv<K, V, D>::ContinueSession(const Guid& session_id) { |
323 | auto iter = checkpoint_.continue_tokens.find(session_id); |
324 | if(iter == checkpoint_.continue_tokens.end()) { |
325 | throw std::invalid_argument{ "Unknown session ID" }; |
326 | } |
327 | |
328 | SystemState state = system_state_.load(); |
329 | if(state.phase != Phase::REST) { |
330 | throw std::runtime_error{ "Can continue only in REST phase!" }; |
331 | } |
332 | thread_ctx().Initialize(state.phase, state.version, session_id, iter->second); |
333 | Refresh(); |
334 | return iter->second; |
335 | } |
336 | |
337 | template <class K, class V, class D> |
338 | inline void FasterKv<K, V, D>::Refresh() { |
339 | epoch_.ProtectAndDrain(); |
340 | // We check if we are in normal mode |
341 | SystemState new_state = system_state_.load(); |
342 | if(thread_ctx().phase == Phase::REST && new_state.phase == Phase::REST) { |
343 | return; |
344 | } |
345 | HandleSpecialPhases(); |
346 | } |
347 | |
348 | template <class K, class V, class D> |
349 | inline void FasterKv<K, V, D>::StopSession() { |
350 | // If this thread is still involved in some activity, wait until it finishes. |
351 | while(thread_ctx().phase != Phase::REST || |
352 | !thread_ctx().pending_ios.empty() || |
353 | !thread_ctx().retry_requests.empty()) { |
354 | CompletePending(false); |
355 | std::this_thread::yield(); |
356 | } |
357 | |
358 | assert(thread_ctx().retry_requests.empty()); |
359 | assert(thread_ctx().pending_ios.empty()); |
360 | assert(thread_ctx().io_responses.empty()); |
361 | |
362 | assert(prev_thread_ctx().retry_requests.empty()); |
363 | assert(prev_thread_ctx().pending_ios.empty()); |
364 | assert(prev_thread_ctx().io_responses.empty()); |
365 | |
366 | assert(thread_ctx().phase == Phase::REST); |
367 | |
368 | epoch_.Unprotect(); |
369 | } |
370 | |
371 | template <class K, class V, class D> |
372 | inline const AtomicHashBucketEntry* FasterKv<K, V, D>::FindEntry(KeyHash hash) const { |
373 | // Truncate the hash to get a bucket page_index < state[version].size. |
374 | uint32_t version = resize_info_.version; |
375 | const HashBucket* bucket = &state_[version].bucket(hash); |
376 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
377 | |
378 | while(true) { |
379 | // Search through the bucket looking for our key. Last entry is reserved |
380 | // for the overflow pointer. |
381 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
382 | HashBucketEntry entry = bucket->entries[entry_idx].load(); |
383 | if(entry.unused()) { |
384 | continue; |
385 | } |
386 | if(hash.tag() == entry.tag()) { |
387 | // Found a matching tag. (So, the input hash matches the entry on 14 tag bits + |
388 | // log_2(table size) address bits.) |
389 | if(!entry.tentative()) { |
390 | // If (final key, return immediately) |
391 | return &bucket->entries[entry_idx]; |
392 | } |
393 | } |
394 | } |
395 | |
396 | // Go to next bucket in the chain |
397 | HashBucketOverflowEntry entry = bucket->overflow_entry.load(); |
398 | if(entry.unused()) { |
399 | // No more buckets in the chain. |
400 | return nullptr; |
401 | } |
402 | bucket = &overflow_buckets_allocator_[version].Get(entry.address()); |
403 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
404 | } |
405 | assert(false); |
406 | return nullptr; // NOT REACHED |
407 | } |
408 | |
409 | template <class K, class V, class D> |
410 | inline AtomicHashBucketEntry* FasterKv<K, V, D>::FindTentativeEntry(KeyHash hash, |
411 | HashBucket* bucket, |
412 | uint8_t version, HashBucketEntry& expected_entry) { |
413 | expected_entry = HashBucketEntry::kInvalidEntry; |
414 | AtomicHashBucketEntry* atomic_entry = nullptr; |
415 | // Try to find a slot that contains the right tag or that's free. |
416 | while(true) { |
417 | // Search through the bucket looking for our key. Last entry is reserved |
418 | // for the overflow pointer. |
419 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
420 | HashBucketEntry entry = bucket->entries[entry_idx].load(); |
421 | if(entry.unused()) { |
422 | if(!atomic_entry) { |
423 | // Found a free slot; keep track of it, and continue looking for a match. |
424 | atomic_entry = &bucket->entries[entry_idx]; |
425 | } |
426 | continue; |
427 | } |
428 | if(hash.tag() == entry.tag() && !entry.tentative()) { |
429 | // Found a match. (So, the input hash matches the entry on 14 tag bits + |
430 | // log_2(table size) address bits.) Return it to caller. |
431 | expected_entry = entry; |
432 | return &bucket->entries[entry_idx]; |
433 | } |
434 | } |
435 | // Go to next bucket in the chain |
436 | HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load(); |
437 | if(overflow_entry.unused()) { |
438 | // No more buckets in the chain. |
439 | if(atomic_entry) { |
440 | // We found a free slot earlier (possibly inside an earlier bucket). |
441 | assert(expected_entry == HashBucketEntry::kInvalidEntry); |
442 | return atomic_entry; |
443 | } |
444 | // We didn't find any free slots, so allocate new bucket. |
445 | FixedPageAddress new_bucket_addr = overflow_buckets_allocator_[version].Allocate(); |
446 | bool success; |
447 | do { |
448 | HashBucketOverflowEntry new_bucket_entry{ new_bucket_addr }; |
449 | success = bucket->overflow_entry.compare_exchange_strong(overflow_entry, |
450 | new_bucket_entry); |
451 | } while(!success && overflow_entry.unused()); |
452 | if(!success) { |
453 | // Install failed, undo allocation; use the winner's entry |
454 | overflow_buckets_allocator_[version].FreeAtEpoch(new_bucket_addr, 0); |
455 | } else { |
456 | // Install succeeded; we have a new bucket on the chain. Return its first slot. |
457 | bucket = &overflow_buckets_allocator_[version].Get(new_bucket_addr); |
458 | assert(expected_entry == HashBucketEntry::kInvalidEntry); |
459 | return &bucket->entries[0]; |
460 | } |
461 | } |
462 | // Go to the next bucket. |
463 | bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address()); |
464 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
465 | } |
466 | assert(false); |
467 | return nullptr; // NOT REACHED |
468 | } |
469 | |
470 | template <class K, class V, class D> |
471 | bool FasterKv<K, V, D>::HasConflictingEntry(KeyHash hash, const HashBucket* bucket, uint8_t version, |
472 | const AtomicHashBucketEntry* atomic_entry) const { |
473 | uint16_t tag = atomic_entry->load().tag(); |
474 | while(true) { |
475 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
476 | HashBucketEntry entry = bucket->entries[entry_idx].load(); |
477 | if(entry != HashBucketEntry::kInvalidEntry && |
478 | entry.tag() == tag && |
479 | atomic_entry != &bucket->entries[entry_idx]) { |
480 | // Found a conflict. |
481 | return true; |
482 | } |
483 | } |
484 | // Go to next bucket in the chain |
485 | HashBucketOverflowEntry entry = bucket->overflow_entry.load(); |
486 | if(entry.unused()) { |
487 | // Reached the end of the bucket chain; no conflicts found. |
488 | return false; |
489 | } |
490 | // Go to the next bucket. |
491 | bucket = &overflow_buckets_allocator_[version].Get(entry.address()); |
492 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
493 | } |
494 | } |
495 | |
496 | template <class K, class V, class D> |
497 | inline AtomicHashBucketEntry* FasterKv<K, V, D>::FindOrCreateEntry(KeyHash hash, |
498 | HashBucketEntry& expected_entry, HashBucket*& bucket) { |
499 | bucket = nullptr; |
500 | // Truncate the hash to get a bucket page_index < state[version].size. |
501 | uint32_t version = resize_info_.version; |
502 | assert(version <= 1); |
503 | |
504 | while(true) { |
505 | bucket = &state_[version].bucket(hash); |
506 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
507 | |
508 | AtomicHashBucketEntry* atomic_entry = FindTentativeEntry(hash, bucket, version, |
509 | expected_entry); |
510 | if(expected_entry != HashBucketEntry::kInvalidEntry) { |
511 | // Found an existing hash bucket entry; nothing further to check. |
512 | return atomic_entry; |
513 | } |
514 | // We have a free slot. |
515 | assert(atomic_entry); |
516 | assert(expected_entry == HashBucketEntry::kInvalidEntry); |
517 | // Try to install tentative tag in free slot. |
518 | HashBucketEntry entry{ Address::kInvalidAddress, hash.tag(), true }; |
519 | if(atomic_entry->compare_exchange_strong(expected_entry, entry)) { |
520 | // See if some other thread is also trying to install this tag. |
521 | if(HasConflictingEntry(hash, bucket, version, atomic_entry)) { |
522 | // Back off and try again. |
523 | atomic_entry->store(HashBucketEntry::kInvalidEntry); |
524 | } else { |
525 | // No other thread was trying to install this tag, so we can clear our entry's "tentative" |
526 | // bit. |
527 | expected_entry = HashBucketEntry{ Address::kInvalidAddress, hash.tag(), false }; |
528 | atomic_entry->store(expected_entry); |
529 | return atomic_entry; |
530 | } |
531 | } |
532 | } |
533 | assert(false); |
534 | return nullptr; // NOT REACHED |
535 | } |
536 | |
537 | template <class K, class V, class D> |
538 | template <class RC> |
539 | inline Status FasterKv<K, V, D>::Read(RC& context, AsyncCallback callback, |
540 | uint64_t monotonic_serial_num) { |
541 | typedef RC read_context_t; |
542 | typedef PendingReadContext<RC> pending_read_context_t; |
543 | static_assert(std::is_base_of<value_t, typename read_context_t::value_t>::value, |
544 | "value_t is not a base class of read_context_t::value_t" ); |
545 | static_assert(alignof(value_t) == alignof(typename read_context_t::value_t), |
546 | "alignof(value_t) != alignof(typename read_context_t::value_t)" ); |
547 | |
548 | pending_read_context_t pending_context{ context, callback }; |
549 | OperationStatus internal_status = InternalRead(pending_context); |
550 | Status status; |
551 | if(internal_status == OperationStatus::SUCCESS) { |
552 | status = Status::Ok; |
553 | } else if(internal_status == OperationStatus::NOT_FOUND) { |
554 | status = Status::NotFound; |
555 | } else { |
556 | assert(internal_status == OperationStatus::RECORD_ON_DISK); |
557 | bool async; |
558 | status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async); |
559 | } |
560 | thread_ctx().serial_num = monotonic_serial_num; |
561 | return status; |
562 | } |
563 | |
564 | template <class K, class V, class D> |
565 | template <class UC> |
566 | inline Status FasterKv<K, V, D>::Upsert(UC& context, AsyncCallback callback, |
567 | uint64_t monotonic_serial_num) { |
568 | typedef UC upsert_context_t; |
569 | typedef PendingUpsertContext<UC> pending_upsert_context_t; |
570 | static_assert(std::is_base_of<value_t, typename upsert_context_t::value_t>::value, |
571 | "value_t is not a base class of upsert_context_t::value_t" ); |
572 | static_assert(alignof(value_t) == alignof(typename upsert_context_t::value_t), |
573 | "alignof(value_t) != alignof(typename upsert_context_t::value_t)" ); |
574 | |
575 | pending_upsert_context_t pending_context{ context, callback }; |
576 | OperationStatus internal_status = InternalUpsert(pending_context); |
577 | Status status; |
578 | |
579 | if(internal_status == OperationStatus::SUCCESS) { |
580 | status = Status::Ok; |
581 | } else { |
582 | bool async; |
583 | status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async); |
584 | } |
585 | thread_ctx().serial_num = monotonic_serial_num; |
586 | return status; |
587 | } |
588 | |
589 | template <class K, class V, class D> |
590 | template <class MC> |
591 | inline Status FasterKv<K, V, D>::Rmw(MC& context, AsyncCallback callback, |
592 | uint64_t monotonic_serial_num) { |
593 | typedef MC rmw_context_t; |
594 | typedef PendingRmwContext<MC> pending_rmw_context_t; |
595 | static_assert(std::is_base_of<value_t, typename rmw_context_t::value_t>::value, |
596 | "value_t is not a base class of rmw_context_t::value_t" ); |
597 | static_assert(alignof(value_t) == alignof(typename rmw_context_t::value_t), |
598 | "alignof(value_t) != alignof(typename rmw_context_t::value_t)" ); |
599 | |
600 | pending_rmw_context_t pending_context{ context, callback }; |
601 | OperationStatus internal_status = InternalRmw(pending_context, false); |
602 | Status status; |
603 | if(internal_status == OperationStatus::SUCCESS) { |
604 | status = Status::Ok; |
605 | } else { |
606 | bool async; |
607 | status = HandleOperationStatus(thread_ctx(), pending_context, internal_status, async); |
608 | } |
609 | thread_ctx().serial_num = monotonic_serial_num; |
610 | return status; |
611 | } |
612 | |
613 | template <class K, class V, class D> |
614 | inline bool FasterKv<K, V, D>::CompletePending(bool wait) { |
615 | do { |
616 | disk.TryComplete(); |
617 | |
618 | bool done = true; |
619 | if(thread_ctx().phase != Phase::WAIT_PENDING && thread_ctx().phase != Phase::IN_PROGRESS) { |
620 | CompleteIoPendingRequests(thread_ctx()); |
621 | } |
622 | Refresh(); |
623 | CompleteRetryRequests(thread_ctx()); |
624 | |
625 | done = (thread_ctx().pending_ios.empty() && thread_ctx().retry_requests.empty()); |
626 | |
627 | if(thread_ctx().phase != Phase::REST) { |
628 | CompleteIoPendingRequests(prev_thread_ctx()); |
629 | Refresh(); |
630 | CompleteRetryRequests(prev_thread_ctx()); |
631 | done = false; |
632 | } |
633 | if(done) { |
634 | return true; |
635 | } |
636 | } while(wait); |
637 | return false; |
638 | } |
639 | |
640 | template <class K, class V, class D> |
641 | inline void FasterKv<K, V, D>::CompleteIoPendingRequests(ExecutionContext& context) { |
642 | AsyncIOContext* ctxt; |
643 | // Clear this thread's I/O response queue. (Does not clear I/Os issued by this thread that have |
644 | // not yet completed.) |
645 | while(context.io_responses.try_pop(ctxt)) { |
646 | CallbackContext<AsyncIOContext> io_context{ ctxt }; |
647 | CallbackContext<pending_context_t> pending_context{ io_context->caller_context }; |
648 | // This I/O is no longer pending, since we popped its response off the queue. |
649 | auto pending_io = context.pending_ios.find(io_context->io_id); |
650 | assert(pending_io != context.pending_ios.end()); |
651 | context.pending_ios.erase(pending_io); |
652 | |
653 | // Issue the continue command |
654 | OperationStatus internal_status; |
655 | if(pending_context->type == OperationType::Read) { |
656 | internal_status = InternalContinuePendingRead(context, *io_context.get()); |
657 | } else { |
658 | assert(pending_context->type == OperationType::RMW); |
659 | internal_status = InternalContinuePendingRmw(context, *io_context.get()); |
660 | } |
661 | Status result; |
662 | if(internal_status == OperationStatus::SUCCESS) { |
663 | result = Status::Ok; |
664 | } else if(internal_status == OperationStatus::NOT_FOUND) { |
665 | result = Status::NotFound; |
666 | } else { |
667 | result = HandleOperationStatus(context, *pending_context.get(), internal_status, |
668 | pending_context.async); |
669 | } |
670 | if(!pending_context.async) { |
671 | pending_context->caller_callback(pending_context->caller_context, result); |
672 | } |
673 | } |
674 | } |
675 | |
676 | template <class K, class V, class D> |
677 | inline void FasterKv<K, V, D>::CompleteRetryRequests(ExecutionContext& context) { |
678 | // If we can't complete a request, it will be pushed back onto the deque. Retry each request |
679 | // only once. |
680 | size_t size = context.retry_requests.size(); |
681 | for(size_t idx = 0; idx < size; ++idx) { |
682 | CallbackContext<pending_context_t> pending_context{ context.retry_requests.front() }; |
683 | context.retry_requests.pop_front(); |
684 | // Issue retry command |
685 | OperationStatus internal_status; |
686 | switch(pending_context->type) { |
687 | case OperationType::RMW: |
688 | internal_status = InternalRetryPendingRmw( |
689 | *static_cast<async_pending_rmw_context_t*>(pending_context.get())); |
690 | break; |
691 | case OperationType::Upsert: |
692 | internal_status = InternalUpsert( |
693 | *static_cast<async_pending_upsert_context_t*>(pending_context.get())); |
694 | break; |
695 | default: |
696 | assert(false); |
697 | throw std::runtime_error{ "Cannot happen!" }; |
698 | } |
699 | // Handle operation status |
700 | Status result; |
701 | if(internal_status == OperationStatus::SUCCESS) { |
702 | result = Status::Ok; |
703 | } else { |
704 | result = HandleOperationStatus(context, *pending_context.get(), internal_status, |
705 | pending_context.async); |
706 | } |
707 | |
708 | // If done, callback user code. |
709 | if(!pending_context.async) { |
710 | pending_context->caller_callback(pending_context->caller_context, result); |
711 | } |
712 | } |
713 | } |
714 | |
715 | template <class K, class V, class D> |
716 | template <class C> |
717 | inline OperationStatus FasterKv<K, V, D>::InternalRead(C& pending_context) const { |
718 | typedef C pending_read_context_t; |
719 | |
720 | if(thread_ctx().phase != Phase::REST) { |
721 | const_cast<faster_t*>(this)->HeavyEnter(); |
722 | } |
723 | |
724 | const key_t& key = pending_context.key(); |
725 | KeyHash hash = key.GetHash(); |
726 | const AtomicHashBucketEntry* atomic_entry = FindEntry(hash); |
727 | if(!atomic_entry) { |
728 | // no record found |
729 | return OperationStatus::NOT_FOUND; |
730 | } |
731 | |
732 | HashBucketEntry entry = atomic_entry->load(); |
733 | Address address = entry.address(); |
734 | Address begin_address = hlog.begin_address.load(); |
735 | Address head_address = hlog.head_address.load(); |
736 | Address safe_read_only_address = hlog.safe_read_only_address.load(); |
737 | Address read_only_address = hlog.read_only_address.load(); |
738 | uint64_t latest_record_version = 0; |
739 | |
740 | if(address >= head_address) { |
741 | // Look through the in-memory portion of the log, to find the first record (if any) whose key |
742 | // matches. |
743 | const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address)); |
744 | latest_record_version = record->header.checkpoint_version; |
745 | if(key != record->key()) { |
746 | address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); |
747 | } |
748 | } |
749 | |
750 | switch(thread_ctx().phase) { |
751 | case Phase::PREPARE: |
752 | // Reading old version (v). |
753 | if(latest_record_version > thread_ctx().version) { |
754 | // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than |
755 | // what we've seen. |
756 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, entry); |
757 | return OperationStatus::CPR_SHIFT_DETECTED; |
758 | } |
759 | break; |
760 | default: |
761 | break; |
762 | } |
763 | |
764 | if(address >= safe_read_only_address) { |
765 | // Mutable or fuzzy region |
766 | // concurrent read |
767 | pending_context.GetAtomic(hlog.Get(address)); |
768 | return OperationStatus::SUCCESS; |
769 | } else if(address >= head_address) { |
770 | // Immutable region |
771 | // single-thread read |
772 | pending_context.Get(hlog.Get(address)); |
773 | return OperationStatus::SUCCESS; |
774 | } else if(address >= begin_address) { |
775 | // Record not available in-memory |
776 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, entry); |
777 | return OperationStatus::RECORD_ON_DISK; |
778 | } else { |
779 | // No record found |
780 | return OperationStatus::NOT_FOUND; |
781 | } |
782 | } |
783 | |
784 | template <class K, class V, class D> |
785 | template <class C> |
786 | inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) { |
787 | typedef C pending_upsert_context_t; |
788 | |
789 | if(thread_ctx().phase != Phase::REST) { |
790 | HeavyEnter(); |
791 | } |
792 | |
793 | const key_t& key = pending_context.key(); |
794 | KeyHash hash = key.GetHash(); |
795 | HashBucketEntry expected_entry; |
796 | HashBucket* bucket; |
797 | AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket); |
798 | |
799 | // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.) |
800 | Address address = expected_entry.address(); |
801 | Address head_address = hlog.head_address.load(); |
802 | Address read_only_address = hlog.read_only_address.load(); |
803 | uint64_t latest_record_version = 0; |
804 | |
805 | if(address >= head_address) { |
806 | // Multiple keys may share the same hash. Try to find the most recent record with a matching |
807 | // key that we might be able to update in place. |
808 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
809 | latest_record_version = record->header.checkpoint_version; |
810 | if(key != record->key()) { |
811 | address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); |
812 | } |
813 | } |
814 | |
815 | CheckpointLockGuard lock_guard{ checkpoint_locks_, hash }; |
816 | |
817 | // The common case |
818 | if(thread_ctx().phase == Phase::REST && address >= read_only_address) { |
819 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
820 | if(pending_context.PutAtomic(record)) { |
821 | return OperationStatus::SUCCESS; |
822 | } else { |
823 | // Must retry as RCU. |
824 | goto create_record; |
825 | } |
826 | } |
827 | |
828 | // Acquire necessary locks. |
829 | switch(thread_ctx().phase) { |
830 | case Phase::PREPARE: |
831 | // Working on old version (v). |
832 | if(!lock_guard.try_lock_old()) { |
833 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry); |
834 | return OperationStatus::CPR_SHIFT_DETECTED; |
835 | } else { |
836 | if(latest_record_version > thread_ctx().version) { |
837 | // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than |
838 | // what we've seen. |
839 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, |
840 | expected_entry); |
841 | return OperationStatus::CPR_SHIFT_DETECTED; |
842 | } |
843 | } |
844 | break; |
845 | case Phase::IN_PROGRESS: |
846 | // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}. |
847 | if(latest_record_version < thread_ctx().version) { |
848 | // Will create new record or update existing record to new version (v+1). |
849 | if(!lock_guard.try_lock_new()) { |
850 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, |
851 | expected_entry); |
852 | return OperationStatus::RETRY_LATER; |
853 | } else { |
854 | // Update to new version (v+1) requires RCU. |
855 | goto create_record; |
856 | } |
857 | } |
858 | break; |
859 | case Phase::WAIT_PENDING: |
860 | // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}. |
861 | if(latest_record_version < thread_ctx().version) { |
862 | if(lock_guard.old_locked()) { |
863 | pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, |
864 | expected_entry); |
865 | return OperationStatus::RETRY_LATER; |
866 | } else { |
867 | // Update to new version (v+1) requires RCU. |
868 | goto create_record; |
869 | } |
870 | } |
871 | break; |
872 | case Phase::WAIT_FLUSH: |
873 | // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}. |
874 | if(latest_record_version < thread_ctx().version) { |
875 | goto create_record; |
876 | } |
877 | break; |
878 | default: |
879 | break; |
880 | } |
881 | |
882 | if(address >= read_only_address) { |
883 | // Mutable region; try to update in place. |
884 | if(atomic_entry->load() != expected_entry) { |
885 | // Some other thread may have RCUed the record before we locked it; try again. |
886 | return OperationStatus::RETRY_NOW; |
887 | } |
888 | // We acquired the necessary locks, so so we can update the record's bucket atomically. |
889 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
890 | if(pending_context.PutAtomic(record)) { |
891 | // Host successfully replaced record, atomically. |
892 | return OperationStatus::SUCCESS; |
893 | } else { |
894 | // Must retry as RCU. |
895 | goto create_record; |
896 | } |
897 | } |
898 | |
899 | // Create a record and attempt RCU. |
900 | create_record: |
901 | uint32_t record_size = record_t::size(key, pending_context.value_size()); |
902 | Address new_address = BlockAllocate(record_size); |
903 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address)); |
904 | new(record) record_t{ |
905 | RecordInfo{ |
906 | static_cast<uint16_t>(thread_ctx().version), true, false, false, |
907 | expected_entry.address() }, |
908 | key }; |
909 | pending_context.Put(record); |
910 | |
911 | HashBucketEntry updated_entry{ new_address, hash.tag(), false }; |
912 | |
913 | if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) { |
914 | // Installed the new record in the hash table. |
915 | return OperationStatus::SUCCESS; |
916 | } else { |
917 | // Try again. |
918 | record->header.invalid = true; |
919 | return InternalUpsert(pending_context); |
920 | } |
921 | } |
922 | |
923 | template <class K, class V, class D> |
924 | template <class C> |
925 | inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool retrying) { |
926 | typedef C pending_rmw_context_t; |
927 | |
928 | Phase phase = retrying ? pending_context.phase : thread_ctx().phase; |
929 | uint32_t version = retrying ? pending_context.version : thread_ctx().version; |
930 | |
931 | if(phase != Phase::REST) { |
932 | HeavyEnter(); |
933 | } |
934 | |
935 | const key_t& key = pending_context.key(); |
936 | KeyHash hash = key.GetHash(); |
937 | HashBucketEntry expected_entry; |
938 | HashBucket* bucket; |
939 | AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket); |
940 | |
941 | // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.) |
942 | Address address = expected_entry.address(); |
943 | Address begin_address = hlog.begin_address.load(); |
944 | Address head_address = hlog.head_address.load(); |
945 | Address read_only_address = hlog.read_only_address.load(); |
946 | Address safe_read_only_address = hlog.safe_read_only_address.load(); |
947 | uint64_t latest_record_version = 0; |
948 | |
949 | if(address >= head_address) { |
950 | // Multiple keys may share the same hash. Try to find the most recent record with a matching |
951 | // key that we might be able to update in place. |
952 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
953 | latest_record_version = record->header.checkpoint_version; |
954 | if(key != record->key()) { |
955 | address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); |
956 | } |
957 | } |
958 | |
959 | CheckpointLockGuard lock_guard{ checkpoint_locks_, hash }; |
960 | |
961 | // The common case. |
962 | if(phase == Phase::REST && address >= read_only_address) { |
963 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
964 | if(pending_context.RmwAtomic(record)) { |
965 | // In-place RMW succeeded. |
966 | return OperationStatus::SUCCESS; |
967 | } else { |
968 | // Must retry as RCU. |
969 | goto create_record; |
970 | } |
971 | } |
972 | |
973 | // Acquire necessary locks. |
974 | switch(phase) { |
975 | case Phase::PREPARE: |
976 | // Working on old version (v). |
977 | if(!lock_guard.try_lock_old()) { |
978 | // If we're retrying the operation, then we already have an old lock, so we'll always |
979 | // succeed in obtaining a second. Otherwise, another thread has acquired the new lock, so |
980 | // a CPR shift has occurred. |
981 | assert(!retrying); |
982 | pending_context.go_async(phase, version, address, expected_entry); |
983 | return OperationStatus::CPR_SHIFT_DETECTED; |
984 | } else { |
985 | if(latest_record_version > version) { |
986 | // CPR shift detected: we are in the "PREPARE" phase, and a mutable record has a version |
987 | // later than what we've seen. |
988 | assert(!retrying); |
989 | pending_context.go_async(phase, version, address, expected_entry); |
990 | return OperationStatus::CPR_SHIFT_DETECTED; |
991 | } |
992 | } |
993 | break; |
994 | case Phase::IN_PROGRESS: |
995 | // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}. |
996 | if(latest_record_version < version) { |
997 | // Will create new record or update existing record to new version (v+1). |
998 | if(!lock_guard.try_lock_new()) { |
999 | if(!retrying) { |
1000 | pending_context.go_async(phase, version, address, expected_entry); |
1001 | } else { |
1002 | pending_context.continue_async(address, expected_entry); |
1003 | } |
1004 | return OperationStatus::RETRY_LATER; |
1005 | } else { |
1006 | // Update to new version (v+1) requires RCU. |
1007 | goto create_record; |
1008 | } |
1009 | } |
1010 | break; |
1011 | case Phase::WAIT_PENDING: |
1012 | // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}. |
1013 | if(latest_record_version < version) { |
1014 | if(lock_guard.old_locked()) { |
1015 | if(!retrying) { |
1016 | pending_context.go_async(phase, version, address, expected_entry); |
1017 | } else { |
1018 | pending_context.continue_async(address, expected_entry); |
1019 | } |
1020 | return OperationStatus::RETRY_LATER; |
1021 | } else { |
1022 | // Update to new version (v+1) requires RCU. |
1023 | goto create_record; |
1024 | } |
1025 | } |
1026 | break; |
1027 | case Phase::WAIT_FLUSH: |
1028 | // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}. |
1029 | if(latest_record_version < version) { |
1030 | goto create_record; |
1031 | } |
1032 | break; |
1033 | default: |
1034 | break; |
1035 | } |
1036 | |
1037 | if(address >= read_only_address) { |
1038 | // Mutable region. Try to update in place. |
1039 | if(atomic_entry->load() != expected_entry) { |
1040 | // Some other thread may have RCUed the record before we locked it; try again. |
1041 | return OperationStatus::RETRY_NOW; |
1042 | } |
1043 | // We acquired the necessary locks, so so we can update the record's bucket atomically. |
1044 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
1045 | if(pending_context.RmwAtomic(record)) { |
1046 | // In-place RMW succeeded. |
1047 | return OperationStatus::SUCCESS; |
1048 | } else { |
1049 | // Must retry as RCU. |
1050 | goto create_record; |
1051 | } |
1052 | } else if(address >= safe_read_only_address) { |
1053 | // Fuzzy Region: Must go pending due to lost-update anomaly |
1054 | if(!retrying) { |
1055 | pending_context.go_async(phase, version, address, expected_entry); |
1056 | } else { |
1057 | pending_context.continue_async(address, expected_entry); |
1058 | } |
1059 | return OperationStatus::RETRY_LATER; |
1060 | } else if(address >= head_address) { |
1061 | goto create_record; |
1062 | } else if(address >= begin_address) { |
1063 | // Need to obtain old record from disk. |
1064 | if(!retrying) { |
1065 | pending_context.go_async(phase, version, address, expected_entry); |
1066 | } else { |
1067 | pending_context.continue_async(address, expected_entry); |
1068 | } |
1069 | return OperationStatus::RECORD_ON_DISK; |
1070 | } else { |
1071 | // Create a new record. |
1072 | goto create_record; |
1073 | } |
1074 | |
1075 | // Create a record and attempt RCU. |
1076 | create_record: |
1077 | uint32_t record_size = record_t::size(key, pending_context.value_size()); |
1078 | Address new_address = BlockAllocate(record_size); |
1079 | record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address)); |
1080 | |
1081 | // Allocating a block may have the side effect of advancing the head address. |
1082 | head_address = hlog.head_address.load(); |
1083 | // Allocating a block may have the side effect of advancing the thread context's version and |
1084 | // phase. |
1085 | if(!retrying) { |
1086 | phase = thread_ctx().phase; |
1087 | version = thread_ctx().version; |
1088 | } |
1089 | |
1090 | new(new_record) record_t{ |
1091 | RecordInfo{ |
1092 | static_cast<uint16_t>(version), true, false, false, |
1093 | expected_entry.address() }, |
1094 | key }; |
1095 | if(address < hlog.begin_address.load()) { |
1096 | pending_context.RmwInitial(new_record); |
1097 | } else if(address >= head_address) { |
1098 | const record_t* old_record = reinterpret_cast<const record_t*>(hlog.Get(address)); |
1099 | pending_context.RmwCopy(old_record, new_record); |
1100 | } else { |
1101 | // The block we allocated for the new record caused the head address to advance beyond |
1102 | // the old record. Need to obtain the old record from disk. |
1103 | new_record->header.invalid = true; |
1104 | if(!retrying) { |
1105 | pending_context.go_async(phase, version, address, expected_entry); |
1106 | } else { |
1107 | pending_context.continue_async(address, expected_entry); |
1108 | } |
1109 | return OperationStatus::RECORD_ON_DISK; |
1110 | } |
1111 | |
1112 | HashBucketEntry updated_entry{ new_address, hash.tag(), false }; |
1113 | if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) { |
1114 | return OperationStatus::SUCCESS; |
1115 | } else { |
1116 | // CAS failed; try again. |
1117 | new_record->header.invalid = true; |
1118 | if(!retrying) { |
1119 | pending_context.go_async(phase, version, address, expected_entry); |
1120 | } else { |
1121 | pending_context.continue_async(address, expected_entry); |
1122 | } |
1123 | return OperationStatus::RETRY_NOW; |
1124 | } |
1125 | } |
1126 | |
1127 | template <class K, class V, class D> |
1128 | inline OperationStatus FasterKv<K, V, D>::InternalRetryPendingRmw( |
1129 | async_pending_rmw_context_t& pending_context) { |
1130 | OperationStatus status = InternalRmw(pending_context, true); |
1131 | if(status == OperationStatus::SUCCESS && pending_context.version != thread_ctx().version) { |
1132 | status = OperationStatus::SUCCESS_UNMARK; |
1133 | } |
1134 | return status; |
1135 | } |
1136 | |
1137 | template <class K, class V, class D> |
1138 | inline Address FasterKv<K, V, D>::TraceBackForKeyMatch(const key_t& key, Address from_address, |
1139 | Address min_offset) const { |
1140 | while(from_address >= min_offset) { |
1141 | const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address)); |
1142 | if(key == record->key()) { |
1143 | return from_address; |
1144 | } else { |
1145 | from_address = record->header.previous_address(); |
1146 | continue; |
1147 | } |
1148 | } |
1149 | return from_address; |
1150 | } |
1151 | |
1152 | template <class K, class V, class D> |
1153 | inline Status FasterKv<K, V, D>::HandleOperationStatus(ExecutionContext& ctx, |
1154 | pending_context_t& pending_context, OperationStatus internal_status, bool& async) { |
1155 | async = false; |
1156 | switch(internal_status) { |
1157 | case OperationStatus::RETRY_NOW: |
1158 | switch(pending_context.type) { |
1159 | case OperationType::Read: { |
1160 | async_pending_read_context_t& read_context = |
1161 | *static_cast<async_pending_read_context_t*>(&pending_context); |
1162 | internal_status = InternalRead(read_context); |
1163 | break; |
1164 | } |
1165 | case OperationType::Upsert: { |
1166 | async_pending_upsert_context_t& upsert_context = |
1167 | *static_cast<async_pending_upsert_context_t*>(&pending_context); |
1168 | internal_status = InternalUpsert(upsert_context); |
1169 | break; |
1170 | } |
1171 | case OperationType::RMW: { |
1172 | async_pending_rmw_context_t& rmw_context = |
1173 | *static_cast<async_pending_rmw_context_t*>(&pending_context); |
1174 | internal_status = InternalRmw(rmw_context, false); |
1175 | break; |
1176 | } |
1177 | } |
1178 | |
1179 | if(internal_status == OperationStatus::SUCCESS) { |
1180 | return Status::Ok; |
1181 | } else { |
1182 | return HandleOperationStatus(ctx, pending_context, internal_status, async); |
1183 | } |
1184 | case OperationStatus::RETRY_LATER: |
1185 | if(thread_ctx().phase == Phase::PREPARE) { |
1186 | assert(pending_context.type == OperationType::RMW); |
1187 | // Can I be marking an operation again and again? |
1188 | if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) { |
1189 | return PivotAndRetry(ctx, pending_context, async); |
1190 | } |
1191 | } |
1192 | return RetryLater(ctx, pending_context, async); |
1193 | case OperationStatus::RECORD_ON_DISK: |
1194 | if(thread_ctx().phase == Phase::PREPARE) { |
1195 | assert(pending_context.type == OperationType::Read || |
1196 | pending_context.type == OperationType::RMW); |
1197 | // Can I be marking an operation again and again? |
1198 | if(!checkpoint_locks_.get_lock(pending_context.key().GetHash()).try_lock_old()) { |
1199 | return PivotAndRetry(ctx, pending_context, async); |
1200 | } |
1201 | } |
1202 | return IssueAsyncIoRequest(ctx, pending_context, async); |
1203 | case OperationStatus::SUCCESS_UNMARK: |
1204 | checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old(); |
1205 | return Status::Ok; |
1206 | case OperationStatus::NOT_FOUND_UNMARK: |
1207 | checkpoint_locks_.get_lock(pending_context.key().GetHash()).unlock_old(); |
1208 | return Status::NotFound; |
1209 | case OperationStatus::CPR_SHIFT_DETECTED: |
1210 | return PivotAndRetry(ctx, pending_context, async); |
1211 | } |
1212 | // not reached |
1213 | assert(false); |
1214 | return Status::Corruption; |
1215 | } |
1216 | |
1217 | template <class K, class V, class D> |
1218 | inline Status FasterKv<K, V, D>::PivotAndRetry(ExecutionContext& ctx, |
1219 | pending_context_t& pending_context, bool& async) { |
1220 | // Some invariants |
1221 | assert(ctx.version == thread_ctx().version); |
1222 | assert(thread_ctx().phase == Phase::PREPARE); |
1223 | Refresh(); |
1224 | // thread must have moved to IN_PROGRESS phase |
1225 | assert(thread_ctx().version == ctx.version + 1); |
1226 | // retry with new contexts |
1227 | pending_context.phase = thread_ctx().phase; |
1228 | pending_context.version = thread_ctx().version; |
1229 | return HandleOperationStatus(thread_ctx(), pending_context, OperationStatus::RETRY_NOW, async); |
1230 | } |
1231 | |
1232 | template <class K, class V, class D> |
1233 | inline Status FasterKv<K, V, D>::RetryLater(ExecutionContext& ctx, |
1234 | pending_context_t& pending_context, bool& async) { |
1235 | IAsyncContext* context_copy; |
1236 | Status result = pending_context.DeepCopy(context_copy); |
1237 | if(result == Status::Ok) { |
1238 | async = true; |
1239 | ctx.retry_requests.push_back(context_copy); |
1240 | return Status::Pending; |
1241 | } else { |
1242 | async = false; |
1243 | return result; |
1244 | } |
1245 | } |
1246 | |
1247 | template <class K, class V, class D> |
1248 | inline constexpr uint32_t FasterKv<K, V, D>::MinIoRequestSize() const { |
1249 | return static_cast<uint32_t>( |
1250 | sizeof(value_t) + pad_alignment(record_t::min_disk_key_size(), |
1251 | alignof(value_t))); |
1252 | } |
1253 | |
1254 | template <class K, class V, class D> |
1255 | inline Status FasterKv<K, V, D>::IssueAsyncIoRequest(ExecutionContext& ctx, |
1256 | pending_context_t& pending_context, bool& async) { |
1257 | // Issue asynchronous I/O request |
1258 | uint64_t io_id = thread_ctx().io_id++; |
1259 | thread_ctx().pending_ios.insert({ io_id, pending_context.key().GetHash() }); |
1260 | async = true; |
1261 | AsyncIOContext io_request{ this, pending_context.address, &pending_context, |
1262 | &thread_ctx().io_responses, io_id }; |
1263 | AsyncGetFromDisk(pending_context.address, MinIoRequestSize(), AsyncGetFromDiskCallback, |
1264 | io_request); |
1265 | return Status::Pending; |
1266 | } |
1267 | |
1268 | template <class K, class V, class D> |
1269 | inline Address FasterKv<K, V, D>::BlockAllocate(uint32_t record_size) { |
1270 | uint32_t page; |
1271 | Address retval = hlog.Allocate(record_size, page); |
1272 | while(retval < hlog.read_only_address.load()) { |
1273 | Refresh(); |
1274 | // Don't overrun the hlog's tail offset. |
1275 | bool page_closed = (retval == Address::kInvalidAddress); |
1276 | while(page_closed) { |
1277 | page_closed = !hlog.NewPage(page); |
1278 | Refresh(); |
1279 | } |
1280 | retval = hlog.Allocate(record_size, page); |
1281 | } |
1282 | return retval; |
1283 | } |
1284 | |
1285 | template <class K, class V, class D> |
1286 | void FasterKv<K, V, D>::AsyncGetFromDisk(Address address, uint32_t num_records, |
1287 | AsyncIOCallback callback, AsyncIOContext& context) { |
1288 | if(epoch_.IsProtected()) { |
1289 | /// Throttling. (Thread pool, unprotected threads are not throttled.) |
1290 | while(num_pending_ios.load() > 120) { |
1291 | disk.TryComplete(); |
1292 | std::this_thread::yield(); |
1293 | epoch_.ProtectAndDrain(); |
1294 | } |
1295 | } |
1296 | ++num_pending_ios; |
1297 | hlog.AsyncGetFromDisk(address, num_records, callback, context); |
1298 | } |
1299 | |
1300 | template <class K, class V, class D> |
1301 | void FasterKv<K, V, D>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result, |
1302 | size_t bytes_transferred) { |
1303 | CallbackContext<AsyncIOContext> context{ ctxt }; |
1304 | faster_t* faster = reinterpret_cast<faster_t*>(context->faster); |
1305 | /// Context stack is: AsyncIOContext, PendingContext. |
1306 | pending_context_t* pending_context = static_cast<pending_context_t*>(context->caller_context); |
1307 | |
1308 | /// This I/O is finished. |
1309 | --faster->num_pending_ios; |
1310 | /// Always "goes async": context is freed by the issuing thread, when processing thread I/O |
1311 | /// responses. |
1312 | context.async = true; |
1313 | |
1314 | pending_context->result = result; |
1315 | if(result == Status::Ok) { |
1316 | record_t* record = reinterpret_cast<record_t*>(context->record.GetValidPointer()); |
1317 | // Size of the record we read from disk (might not have read the entire record, yet). |
1318 | size_t record_size = context->record.available_bytes; |
1319 | if(record->min_disk_key_size() > record_size) { |
1320 | // Haven't read the full record in yet; I/O is not complete! |
1321 | faster->AsyncGetFromDisk(context->address, record->min_disk_key_size(), |
1322 | AsyncGetFromDiskCallback, *context.get()); |
1323 | context.async = true; |
1324 | } else if(record->min_disk_value_size() > record_size) { |
1325 | // Haven't read the full record in yet; I/O is not complete! |
1326 | faster->AsyncGetFromDisk(context->address, record->min_disk_value_size(), |
1327 | AsyncGetFromDiskCallback, *context.get()); |
1328 | context.async = true; |
1329 | } else if(record->disk_size() > record_size) { |
1330 | // Haven't read the full record in yet; I/O is not complete! |
1331 | faster->AsyncGetFromDisk(context->address, record->disk_size(), |
1332 | AsyncGetFromDiskCallback, *context.get()); |
1333 | context.async = true; |
1334 | } else if(pending_context->key() == record->key()) { |
1335 | //The keys are same, so I/O is complete |
1336 | context->thread_io_responses->push(context.get()); |
1337 | } else { |
1338 | //keys are not same. I/O is not complete |
1339 | context->address = record->header.previous_address(); |
1340 | if(context->address >= faster->hlog.begin_address.load()) { |
1341 | faster->AsyncGetFromDisk(context->address, faster->MinIoRequestSize(), |
1342 | AsyncGetFromDiskCallback, *context.get()); |
1343 | context.async = true; |
1344 | } else { |
1345 | // Record not found, so I/O is complete. |
1346 | context->thread_io_responses->push(context.get()); |
1347 | } |
1348 | } |
1349 | } |
1350 | } |
1351 | |
1352 | template <class K, class V, class D> |
1353 | OperationStatus FasterKv<K, V, D>::InternalContinuePendingRead(ExecutionContext& context, |
1354 | AsyncIOContext& io_context) { |
1355 | if(io_context.address >= hlog.begin_address.load()) { |
1356 | async_pending_read_context_t* pending_context = static_cast<async_pending_read_context_t*>( |
1357 | io_context.caller_context); |
1358 | record_t* record = reinterpret_cast<record_t*>(io_context.record.GetValidPointer()); |
1359 | pending_context->Get(record); |
1360 | assert(!kCopyReadsToTail); |
1361 | return (thread_ctx().version > context.version) ? OperationStatus::SUCCESS_UNMARK : |
1362 | OperationStatus::SUCCESS; |
1363 | } else { |
1364 | return (thread_ctx().version > context.version) ? OperationStatus::NOT_FOUND_UNMARK : |
1365 | OperationStatus::NOT_FOUND; |
1366 | } |
1367 | } |
1368 | |
1369 | template <class K, class V, class D> |
1370 | OperationStatus FasterKv<K, V, D>::InternalContinuePendingRmw(ExecutionContext& context, |
1371 | AsyncIOContext& io_context) { |
1372 | async_pending_rmw_context_t* pending_context = static_cast<async_pending_rmw_context_t*>( |
1373 | io_context.caller_context); |
1374 | |
1375 | // Find a hash bucket entry to store the updated value in. |
1376 | const key_t& key = pending_context->key(); |
1377 | KeyHash hash = key.GetHash(); |
1378 | HashBucketEntry expected_entry; |
1379 | HashBucket* bucket; |
1380 | AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket); |
1381 | |
1382 | // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.) |
1383 | Address address = expected_entry.address(); |
1384 | Address head_address = hlog.head_address.load(); |
1385 | |
1386 | // Make sure that atomic_entry is OK to update. |
1387 | if(address >= head_address) { |
1388 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
1389 | if(key != record->key()) { |
1390 | address = TraceBackForKeyMatch(key, record->header.previous_address(), head_address); |
1391 | } |
1392 | } |
1393 | |
1394 | if(address > pending_context->entry.address()) { |
1395 | // We can't trace the current hash bucket entry back to the record we read. |
1396 | pending_context->continue_async(address, expected_entry); |
1397 | return OperationStatus::RETRY_NOW; |
1398 | } |
1399 | assert(address < hlog.begin_address.load() || address == pending_context->entry.address()); |
1400 | |
1401 | // We have to do copy-on-write/RCU and write the updated value to the tail of the log. |
1402 | uint32_t record_size = record_t::size(key, pending_context->value_size()); |
1403 | Address new_address = BlockAllocate(record_size); |
1404 | record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address)); |
1405 | |
1406 | new(new_record) record_t{ |
1407 | RecordInfo{ |
1408 | static_cast<uint16_t>(context.version), true, false, false, |
1409 | expected_entry.address() }, |
1410 | key }; |
1411 | if(io_context.address < hlog.begin_address.load()) { |
1412 | // The on-disk trace back failed to find a key match. |
1413 | pending_context->RmwInitial(new_record); |
1414 | } else { |
1415 | // The record we read from disk. |
1416 | const record_t* disk_record = reinterpret_cast<const record_t*>( |
1417 | io_context.record.GetValidPointer()); |
1418 | pending_context->RmwCopy(disk_record, new_record); |
1419 | } |
1420 | |
1421 | HashBucketEntry updated_entry{ new_address, hash.tag(), false }; |
1422 | if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) { |
1423 | assert(thread_ctx().version >= context.version); |
1424 | return (thread_ctx().version == context.version) ? OperationStatus::SUCCESS : |
1425 | OperationStatus::SUCCESS_UNMARK; |
1426 | } else { |
1427 | // CAS failed; try again. |
1428 | new_record->header.invalid = true; |
1429 | pending_context->continue_async(address, expected_entry); |
1430 | return OperationStatus::RETRY_NOW; |
1431 | } |
1432 | } |
1433 | |
1434 | template <class K, class V, class D> |
1435 | void FasterKv<K, V, D>::InitializeCheckpointLocks() { |
1436 | uint32_t table_version = resize_info_.version; |
1437 | uint64_t size = state_[table_version].size(); |
1438 | checkpoint_locks_.Initialize(size); |
1439 | } |
1440 | |
1441 | template <class K, class V, class D> |
1442 | Status FasterKv<K, V, D>::WriteIndexMetadata() { |
1443 | std::string filename = disk.index_checkpoint_path(checkpoint_.index_token) + "info.dat" ; |
1444 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1445 | // unformatted disks.) |
1446 | std::FILE* file = std::fopen(filename.c_str(), "wb" ); |
1447 | if(!file) { |
1448 | return Status::IOError; |
1449 | } |
1450 | if(std::fwrite(&checkpoint_.index_metadata, sizeof(checkpoint_.index_metadata), 1, file) != 1) { |
1451 | std::fclose(file); |
1452 | return Status::IOError; |
1453 | } |
1454 | if(std::fclose(file) != 0) { |
1455 | return Status::IOError; |
1456 | } |
1457 | return Status::Ok; |
1458 | } |
1459 | |
1460 | template <class K, class V, class D> |
1461 | Status FasterKv<K, V, D>::ReadIndexMetadata(const Guid& token) { |
1462 | std::string filename = disk.index_checkpoint_path(token) + "info.dat" ; |
1463 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1464 | // unformatted disks.) |
1465 | std::FILE* file = std::fopen(filename.c_str(), "rb" ); |
1466 | if(!file) { |
1467 | return Status::IOError; |
1468 | } |
1469 | if(std::fread(&checkpoint_.index_metadata, sizeof(checkpoint_.index_metadata), 1, file) != 1) { |
1470 | std::fclose(file); |
1471 | return Status::IOError; |
1472 | } |
1473 | if(std::fclose(file) != 0) { |
1474 | return Status::IOError; |
1475 | } |
1476 | return Status::Ok; |
1477 | } |
1478 | |
1479 | template <class K, class V, class D> |
1480 | Status FasterKv<K, V, D>::WriteCprMetadata() { |
1481 | std::string filename = disk.cpr_checkpoint_path(checkpoint_.hybrid_log_token) + "info.dat" ; |
1482 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1483 | // unformatted disks.) |
1484 | std::FILE* file = std::fopen(filename.c_str(), "wb" ); |
1485 | if(!file) { |
1486 | return Status::IOError; |
1487 | } |
1488 | if(std::fwrite(&checkpoint_.log_metadata, sizeof(checkpoint_.log_metadata), 1, file) != 1) { |
1489 | std::fclose(file); |
1490 | return Status::IOError; |
1491 | } |
1492 | if(std::fclose(file) != 0) { |
1493 | return Status::IOError; |
1494 | } |
1495 | return Status::Ok; |
1496 | } |
1497 | |
1498 | template <class K, class V, class D> |
1499 | Status FasterKv<K, V, D>::ReadCprMetadata(const Guid& token) { |
1500 | std::string filename = disk.cpr_checkpoint_path(token) + "info.dat" ; |
1501 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1502 | // unformatted disks.) |
1503 | std::FILE* file = std::fopen(filename.c_str(), "rb" ); |
1504 | if(!file) { |
1505 | return Status::IOError; |
1506 | } |
1507 | if(std::fread(&checkpoint_.log_metadata, sizeof(checkpoint_.log_metadata), 1, file) != 1) { |
1508 | std::fclose(file); |
1509 | return Status::IOError; |
1510 | } |
1511 | if(std::fclose(file) != 0) { |
1512 | return Status::IOError; |
1513 | } |
1514 | return Status::Ok; |
1515 | } |
1516 | |
1517 | template <class K, class V, class D> |
1518 | Status FasterKv<K, V, D>::WriteCprContext() { |
1519 | std::string filename = disk.cpr_checkpoint_path(checkpoint_.hybrid_log_token); |
1520 | const Guid& guid = prev_thread_ctx().guid; |
1521 | filename += guid.ToString(); |
1522 | filename += ".dat" ; |
1523 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1524 | // unformatted disks.) |
1525 | std::FILE* file = std::fopen(filename.c_str(), "wb" ); |
1526 | if(!file) { |
1527 | return Status::IOError; |
1528 | } |
1529 | if(std::fwrite(static_cast<PersistentExecContext*>(&prev_thread_ctx()), |
1530 | sizeof(PersistentExecContext), 1, file) != 1) { |
1531 | std::fclose(file); |
1532 | return Status::IOError; |
1533 | } |
1534 | if(std::fclose(file) != 0) { |
1535 | return Status::IOError; |
1536 | } |
1537 | return Status::Ok; |
1538 | } |
1539 | |
1540 | template <class K, class V, class D> |
1541 | Status FasterKv<K, V, D>::ReadCprContexts(const Guid& token, const Guid* guids) { |
1542 | for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) { |
1543 | const Guid& guid = guids[idx]; |
1544 | if(guid == Guid{}) { |
1545 | continue; |
1546 | } |
1547 | std::string filename = disk.cpr_checkpoint_path(token); |
1548 | filename += guid.ToString(); |
1549 | filename += ".dat" ; |
1550 | // (This code will need to be refactored into the disk_t interface, if we want to support |
1551 | // unformatted disks.) |
1552 | std::FILE* file = std::fopen(filename.c_str(), "rb" ); |
1553 | if(!file) { |
1554 | return Status::IOError; |
1555 | } |
1556 | PersistentExecContext context{}; |
1557 | if(std::fread(&context, sizeof(PersistentExecContext), 1, file) != 1) { |
1558 | std::fclose(file); |
1559 | return Status::IOError; |
1560 | } |
1561 | if(std::fclose(file) != 0) { |
1562 | return Status::IOError; |
1563 | } |
1564 | auto result = checkpoint_.continue_tokens.insert({ context.guid, context.serial_num }); |
1565 | assert(result.second); |
1566 | } |
1567 | if(checkpoint_.continue_tokens.size() != checkpoint_.log_metadata.num_threads) { |
1568 | return Status::Corruption; |
1569 | } else { |
1570 | return Status::Ok; |
1571 | } |
1572 | } |
1573 | |
1574 | template <class K, class V, class D> |
1575 | Status FasterKv<K, V, D>::CheckpointFuzzyIndex() { |
1576 | uint32_t hash_table_version = resize_info_.version; |
1577 | // Checkpoint the main hash table. |
1578 | file_t ht_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) + |
1579 | "ht.dat" ); |
1580 | RETURN_NOT_OK(ht_file.Open(&disk.handler())); |
1581 | RETURN_NOT_OK(state_[hash_table_version].Checkpoint(disk, std::move(ht_file), |
1582 | checkpoint_.index_metadata.num_ht_bytes)); |
1583 | // Checkpoint the hash table's overflow buckets. |
1584 | file_t ofb_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) + |
1585 | "ofb.dat" ); |
1586 | RETURN_NOT_OK(ofb_file.Open(&disk.handler())); |
1587 | RETURN_NOT_OK(overflow_buckets_allocator_[hash_table_version].Checkpoint(disk, |
1588 | std::move(ofb_file), checkpoint_.index_metadata.num_ofb_bytes)); |
1589 | checkpoint_.index_checkpoint_started = true; |
1590 | return Status::Ok; |
1591 | } |
1592 | |
1593 | template <class K, class V, class D> |
1594 | Status FasterKv<K, V, D>::CheckpointFuzzyIndexComplete() { |
1595 | if(!checkpoint_.index_checkpoint_started) { |
1596 | return Status::Pending; |
1597 | } |
1598 | uint32_t hash_table_version = resize_info_.version; |
1599 | Status result = state_[hash_table_version].CheckpointComplete(false); |
1600 | if(result == Status::Pending) { |
1601 | return Status::Pending; |
1602 | } else if(result != Status::Ok) { |
1603 | return result; |
1604 | } else { |
1605 | return overflow_buckets_allocator_[hash_table_version].CheckpointComplete(false); |
1606 | } |
1607 | } |
1608 | |
1609 | template <class K, class V, class D> |
1610 | Status FasterKv<K, V, D>::RecoverFuzzyIndex() { |
1611 | uint8_t hash_table_version = resize_info_.version; |
1612 | assert(state_[hash_table_version].size() == checkpoint_.index_metadata.table_size); |
1613 | |
1614 | // Recover the main hash table. |
1615 | file_t ht_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) + |
1616 | "ht.dat" ); |
1617 | RETURN_NOT_OK(ht_file.Open(&disk.handler())); |
1618 | RETURN_NOT_OK(state_[hash_table_version].Recover(disk, std::move(ht_file), |
1619 | checkpoint_.index_metadata.num_ht_bytes)); |
1620 | // Recover the hash table's overflow buckets. |
1621 | file_t ofb_file = disk.NewFile(disk.relative_index_checkpoint_path(checkpoint_.index_token) + |
1622 | "ofb.dat" ); |
1623 | RETURN_NOT_OK(ofb_file.Open(&disk.handler())); |
1624 | return overflow_buckets_allocator_[hash_table_version].Recover(disk, std::move(ofb_file), |
1625 | checkpoint_.index_metadata.num_ofb_bytes, checkpoint_.index_metadata.ofb_count); |
1626 | } |
1627 | |
1628 | template <class K, class V, class D> |
1629 | Status FasterKv<K, V, D>::RecoverFuzzyIndexComplete(bool wait) { |
1630 | uint8_t hash_table_version = resize_info_.version; |
1631 | Status result = state_[hash_table_version].RecoverComplete(true); |
1632 | if(result != Status::Ok) { |
1633 | return result; |
1634 | } |
1635 | result = overflow_buckets_allocator_[hash_table_version].RecoverComplete(true); |
1636 | if(result != Status::Ok) { |
1637 | return result; |
1638 | } |
1639 | |
1640 | // Clear all tentative entries. |
1641 | for(uint64_t bucket_idx = 0; bucket_idx < state_[hash_table_version].size(); ++bucket_idx) { |
1642 | HashBucket* bucket = &state_[hash_table_version].bucket(bucket_idx); |
1643 | while(true) { |
1644 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
1645 | if(bucket->entries[entry_idx].load().tentative()) { |
1646 | bucket->entries[entry_idx].store(HashBucketEntry::kInvalidEntry); |
1647 | } |
1648 | } |
1649 | // Go to next bucket in the chain |
1650 | HashBucketOverflowEntry entry = bucket->overflow_entry.load(); |
1651 | if(entry.unused()) { |
1652 | // No more buckets in the chain. |
1653 | break; |
1654 | } |
1655 | bucket = &overflow_buckets_allocator_[hash_table_version].Get(entry.address()); |
1656 | assert(reinterpret_cast<size_t>(bucket) % Constants::kCacheLineBytes == 0); |
1657 | } |
1658 | } |
1659 | return Status::Ok; |
1660 | } |
1661 | |
1662 | template <class K, class V, class D> |
1663 | Status FasterKv<K, V, D>::RecoverHybridLog() { |
1664 | class Context : public IAsyncContext { |
1665 | public: |
1666 | Context(hlog_t& hlog_, uint32_t page_, RecoveryStatus& recovery_status_) |
1667 | : hlog{ &hlog_} |
1668 | , page{ page_ } |
1669 | , recovery_status{ &recovery_status_ } { |
1670 | } |
1671 | /// The deep-copy constructor |
1672 | Context(const Context& other) |
1673 | : hlog{ other.hlog } |
1674 | , page{ other.page } |
1675 | , recovery_status{ other.recovery_status } { |
1676 | } |
1677 | protected: |
1678 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
1679 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
1680 | } |
1681 | public: |
1682 | hlog_t* hlog; |
1683 | uint32_t page; |
1684 | RecoveryStatus* recovery_status; |
1685 | }; |
1686 | |
1687 | auto callback = [](IAsyncContext* ctxt, Status result) { |
1688 | CallbackContext<Context> context{ ctxt }; |
1689 | result = context->hlog->AsyncReadPagesFromLog(context->page, 1, *context->recovery_status); |
1690 | }; |
1691 | |
1692 | Address from_address = checkpoint_.index_metadata.checkpoint_start_address; |
1693 | Address to_address = checkpoint_.log_metadata.final_address; |
1694 | |
1695 | uint32_t start_page = from_address.page(); |
1696 | uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page(); |
1697 | uint32_t capacity = hlog.buffer_size(); |
1698 | RecoveryStatus recovery_status{ start_page, end_page }; |
1699 | // Initially issue read request for all pages that can be held in memory |
1700 | uint32_t total_pages_to_read = end_page - start_page; |
1701 | uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read); |
1702 | RETURN_NOT_OK(hlog.AsyncReadPagesFromLog(start_page, pages_to_read_first, recovery_status)); |
1703 | |
1704 | for(uint32_t page = start_page; page < end_page; ++page) { |
1705 | while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) { |
1706 | disk.TryComplete(); |
1707 | std::this_thread::sleep_for(10ms); |
1708 | } |
1709 | |
1710 | // handle start and end at non-page boundaries |
1711 | RETURN_NOT_OK(RecoverFromPage(page == start_page ? from_address : Address{ page, 0 }, |
1712 | page + 1 == end_page ? to_address : |
1713 | Address{ page, Address::kMaxOffset })); |
1714 | |
1715 | // OS thread flushes current page and issues a read request if necessary |
1716 | if(page + capacity < end_page) { |
1717 | Context context{ hlog, page + capacity, recovery_status }; |
1718 | RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context)); |
1719 | } else { |
1720 | RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr)); |
1721 | } |
1722 | } |
1723 | // Wait until all pages have been flushed |
1724 | for(uint32_t page = start_page; page < end_page; ++page) { |
1725 | while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) { |
1726 | disk.TryComplete(); |
1727 | std::this_thread::sleep_for(10ms); |
1728 | } |
1729 | } |
1730 | return Status::Ok; |
1731 | } |
1732 | |
1733 | template <class K, class V, class D> |
1734 | Status FasterKv<K, V, D>::RecoverHybridLogFromSnapshotFile() { |
1735 | class Context : public IAsyncContext { |
1736 | public: |
1737 | Context(hlog_t& hlog_, file_t& file_, uint32_t file_start_page_, uint32_t page_, |
1738 | RecoveryStatus& recovery_status_) |
1739 | : hlog{ &hlog_ } |
1740 | , file{ &file_ } |
1741 | , file_start_page{ file_start_page_ } |
1742 | , page{ page_ } |
1743 | , recovery_status{ &recovery_status_ } { |
1744 | } |
1745 | /// The deep-copy constructor |
1746 | Context(const Context& other) |
1747 | : hlog{ other.hlog } |
1748 | , file{ other.file } |
1749 | , file_start_page{ other.file_start_page } |
1750 | , page{ other.page } |
1751 | , recovery_status{ other.recovery_status } { |
1752 | } |
1753 | protected: |
1754 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
1755 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
1756 | } |
1757 | public: |
1758 | hlog_t* hlog; |
1759 | file_t* file; |
1760 | uint32_t file_start_page; |
1761 | uint32_t page; |
1762 | RecoveryStatus* recovery_status; |
1763 | }; |
1764 | |
1765 | auto callback = [](IAsyncContext* ctxt, Status result) { |
1766 | CallbackContext<Context> context{ ctxt }; |
1767 | result = context->hlog->AsyncReadPagesFromSnapshot(*context->file, |
1768 | context->file_start_page, context->page, 1, *context->recovery_status); |
1769 | }; |
1770 | |
1771 | Address file_start_address = checkpoint_.log_metadata.flushed_address; |
1772 | Address from_address = checkpoint_.index_metadata.checkpoint_start_address; |
1773 | Address to_address = checkpoint_.log_metadata.final_address; |
1774 | |
1775 | uint32_t start_page = file_start_address.page(); |
1776 | uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page(); |
1777 | uint32_t capacity = hlog.buffer_size(); |
1778 | RecoveryStatus recovery_status{ start_page, end_page }; |
1779 | checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path( |
1780 | checkpoint_.hybrid_log_token) + "snapshot.dat" ); |
1781 | RETURN_NOT_OK(checkpoint_.snapshot_file.Open(&disk.handler())); |
1782 | |
1783 | // Initially issue read request for all pages that can be held in memory |
1784 | uint32_t total_pages_to_read = end_page - start_page; |
1785 | uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read); |
1786 | RETURN_NOT_OK(hlog.AsyncReadPagesFromSnapshot(checkpoint_.snapshot_file, start_page, start_page, |
1787 | pages_to_read_first, recovery_status)); |
1788 | |
1789 | for(uint32_t page = start_page; page < end_page; ++page) { |
1790 | while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) { |
1791 | disk.TryComplete(); |
1792 | std::this_thread::sleep_for(10ms); |
1793 | } |
1794 | |
1795 | // Perform recovery if page in fuzzy portion of the log |
1796 | if(Address{ page + 1, 0 } > from_address) { |
1797 | // handle start and end at non-page boundaries |
1798 | RETURN_NOT_OK(RecoverFromPage(page == from_address.page() ? from_address : |
1799 | Address{ page, 0 }, |
1800 | page + 1 == end_page ? to_address : |
1801 | Address{ page, Address::kMaxOffset })); |
1802 | } |
1803 | |
1804 | // OS thread flushes current page and issues a read request if necessary |
1805 | if(page + capacity < end_page) { |
1806 | Context context{ hlog, checkpoint_.snapshot_file, start_page, page + capacity, |
1807 | recovery_status }; |
1808 | RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context)); |
1809 | } else { |
1810 | RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr)); |
1811 | } |
1812 | } |
1813 | // Wait until all pages have been flushed |
1814 | for(uint32_t page = start_page; page < end_page; ++page) { |
1815 | while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) { |
1816 | disk.TryComplete(); |
1817 | std::this_thread::sleep_for(10ms); |
1818 | } |
1819 | } |
1820 | return Status::Ok; |
1821 | } |
1822 | |
1823 | template <class K, class V, class D> |
1824 | Status FasterKv<K, V, D>::RecoverFromPage(Address from_address, Address to_address) { |
1825 | assert(from_address.page() == to_address.page()); |
1826 | for(Address address = from_address; address < to_address;) { |
1827 | record_t* record = reinterpret_cast<record_t*>(hlog.Get(address)); |
1828 | if(record->header.IsNull()) { |
1829 | address += sizeof(record->header); |
1830 | continue; |
1831 | } |
1832 | if(record->header.invalid) { |
1833 | address += record->size(); |
1834 | continue; |
1835 | } |
1836 | const key_t& key = record->key(); |
1837 | KeyHash hash = key.GetHash(); |
1838 | HashBucketEntry expected_entry; |
1839 | HashBucket* bucket; |
1840 | AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry, bucket); |
1841 | |
1842 | if(record->header.checkpoint_version <= checkpoint_.log_metadata.version) { |
1843 | HashBucketEntry new_entry{ address, hash.tag(), false }; |
1844 | atomic_entry->store(new_entry); |
1845 | } else { |
1846 | record->header.invalid = true; |
1847 | if(record->header.previous_address() < checkpoint_.index_metadata.checkpoint_start_address) { |
1848 | HashBucketEntry new_entry{ record->header.previous_address(), hash.tag(), false }; |
1849 | atomic_entry->store(new_entry); |
1850 | } |
1851 | } |
1852 | address += record->size(); |
1853 | } |
1854 | |
1855 | return Status::Ok; |
1856 | } |
1857 | |
1858 | template <class K, class V, class D> |
1859 | Status FasterKv<K, V, D>::RestoreHybridLog() { |
1860 | Address tail_address = checkpoint_.log_metadata.final_address; |
1861 | uint32_t end_page = tail_address.offset() > 0 ? tail_address.page() + 1 : tail_address.page(); |
1862 | uint32_t capacity = hlog.buffer_size(); |
1863 | // Restore as much of the log as will fit in memory. |
1864 | uint32_t start_page; |
1865 | if(end_page < capacity - hlog.kNumHeadPages) { |
1866 | start_page = 0; |
1867 | } else { |
1868 | start_page = end_page - (capacity - hlog.kNumHeadPages); |
1869 | } |
1870 | RecoveryStatus recovery_status{ start_page, end_page }; |
1871 | |
1872 | uint32_t num_pages = end_page - start_page; |
1873 | RETURN_NOT_OK(hlog.AsyncReadPagesFromLog(start_page, num_pages, recovery_status)); |
1874 | |
1875 | // Wait until all pages have been read. |
1876 | for(uint32_t page = start_page; page < end_page; ++page) { |
1877 | while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) { |
1878 | disk.TryComplete(); |
1879 | std::this_thread::sleep_for(10ms); |
1880 | } |
1881 | } |
1882 | // Skip the null page. |
1883 | Address head_address = start_page == 0 ? Address{ 0, Constants::kCacheLineBytes } : |
1884 | Address{ start_page, 0 }; |
1885 | hlog.RecoveryReset(checkpoint_.index_metadata.log_begin_address, head_address, tail_address); |
1886 | return Status::Ok; |
1887 | } |
1888 | |
1889 | template <class K, class V, class D> |
1890 | void FasterKv<K, V, D>::HeavyEnter() { |
1891 | if(thread_ctx().phase == Phase::GC_IO_PENDING || thread_ctx().phase == Phase::GC_IN_PROGRESS) { |
1892 | CleanHashTableBuckets(); |
1893 | return; |
1894 | } |
1895 | while(thread_ctx().phase == Phase::GROW_PREPARE) { |
1896 | // We spin-wait as a simplification |
1897 | // Could instead do a "heavy operation" here |
1898 | std::this_thread::yield(); |
1899 | Refresh(); |
1900 | } |
1901 | if(thread_ctx().phase == Phase::GROW_IN_PROGRESS) { |
1902 | SplitHashTableBuckets(); |
1903 | } |
1904 | } |
1905 | |
1906 | template <class K, class V, class D> |
1907 | bool FasterKv<K, V, D>::CleanHashTableBuckets() { |
1908 | uint64_t chunk = gc_.next_chunk++; |
1909 | if(chunk >= gc_.num_chunks) { |
1910 | // No chunk left to clean. |
1911 | return false; |
1912 | } |
1913 | uint8_t version = resize_info_.version; |
1914 | Address begin_address = hlog.begin_address.load(); |
1915 | uint64_t upper_bound; |
1916 | if(chunk + 1 < grow_.num_chunks) { |
1917 | // All chunks but the last chunk contain kGrowHashTableChunkSize elements. |
1918 | upper_bound = kGrowHashTableChunkSize; |
1919 | } else { |
1920 | // Last chunk might contain more or fewer elements. |
1921 | upper_bound = state_[version].size() - (chunk * kGcHashTableChunkSize); |
1922 | } |
1923 | for(uint64_t idx = 0; idx < upper_bound; ++idx) { |
1924 | HashBucket* bucket = &state_[version].bucket(chunk * kGcHashTableChunkSize + idx); |
1925 | while(true) { |
1926 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
1927 | AtomicHashBucketEntry& atomic_entry = bucket->entries[entry_idx]; |
1928 | HashBucketEntry expected_entry = atomic_entry.load(); |
1929 | if(!expected_entry.unused() && expected_entry.address() != Address::kInvalidAddress && |
1930 | expected_entry.address() < begin_address) { |
1931 | // The record that this entry points to was truncated; try to delete the entry. |
1932 | atomic_entry.compare_exchange_strong(expected_entry, HashBucketEntry::kInvalidEntry); |
1933 | // If deletion failed, then some other thread must have added a new record to the entry. |
1934 | } |
1935 | } |
1936 | // Go to next bucket in the chain. |
1937 | HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load(); |
1938 | if(overflow_entry.unused()) { |
1939 | // No more buckets in the chain. |
1940 | break; |
1941 | } |
1942 | bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address()); |
1943 | } |
1944 | } |
1945 | // Done with this chunk--did some work. |
1946 | return true; |
1947 | } |
1948 | |
1949 | template <class K, class V, class D> |
1950 | void FasterKv<K, V, D>::AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version, |
1951 | HashBucketEntry entry) { |
1952 | if(next_idx == HashBucket::kNumEntries) { |
1953 | // Need to allocate a new bucket, first. |
1954 | FixedPageAddress new_bucket_addr = overflow_buckets_allocator_[version].Allocate(); |
1955 | HashBucketOverflowEntry new_bucket_entry{ new_bucket_addr }; |
1956 | bucket->overflow_entry.store(new_bucket_entry); |
1957 | bucket = &overflow_buckets_allocator_[version].Get(new_bucket_addr); |
1958 | next_idx = 0; |
1959 | } |
1960 | bucket->entries[next_idx].store(entry); |
1961 | ++next_idx; |
1962 | } |
1963 | |
1964 | template <class K, class V, class D> |
1965 | Address FasterKv<K, V, D>::TraceBackForOtherChainStart(uint64_t old_size, uint64_t new_size, |
1966 | Address from_address, Address min_address, uint8_t side) { |
1967 | assert(side == 0 || side == 1); |
1968 | // Search back as far as min_address. |
1969 | while(from_address >= min_address) { |
1970 | const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(from_address)); |
1971 | KeyHash hash = record->key().GetHash(); |
1972 | if((hash.idx(new_size) < old_size) != (side == 0)) { |
1973 | // Record's key hashes to the other side. |
1974 | return from_address; |
1975 | } |
1976 | from_address = record->header.previous_address(); |
1977 | } |
1978 | return from_address; |
1979 | } |
1980 | |
1981 | template <class K, class V, class D> |
1982 | void FasterKv<K, V, D>::SplitHashTableBuckets() { |
1983 | // This thread won't exit until all hash table buckets have been split. |
1984 | Address head_address = hlog.head_address.load(); |
1985 | Address begin_address = hlog.begin_address.load(); |
1986 | for(uint64_t chunk = grow_.next_chunk++; chunk < grow_.num_chunks; chunk = grow_.next_chunk++) { |
1987 | uint64_t old_size = state_[grow_.old_version].size(); |
1988 | uint64_t new_size = state_[grow_.new_version].size(); |
1989 | assert(new_size == old_size * 2); |
1990 | // Split this chunk. |
1991 | uint64_t upper_bound; |
1992 | if(chunk + 1 < grow_.num_chunks) { |
1993 | // All chunks but the last chunk contain kGrowHashTableChunkSize elements. |
1994 | upper_bound = kGrowHashTableChunkSize; |
1995 | } else { |
1996 | // Last chunk might contain more or fewer elements. |
1997 | upper_bound = old_size - (chunk * kGrowHashTableChunkSize); |
1998 | } |
1999 | for(uint64_t idx = 0; idx < upper_bound; ++idx) { |
2000 | |
2001 | // Split this (chain of) bucket(s). |
2002 | HashBucket* old_bucket = &state_[grow_.old_version].bucket( |
2003 | chunk * kGrowHashTableChunkSize + idx); |
2004 | HashBucket* new_bucket0 = &state_[grow_.new_version].bucket( |
2005 | chunk * kGrowHashTableChunkSize + idx); |
2006 | HashBucket* new_bucket1 = &state_[grow_.new_version].bucket( |
2007 | old_size + chunk * kGrowHashTableChunkSize + idx); |
2008 | uint32_t new_entry_idx0 = 0; |
2009 | uint32_t new_entry_idx1 = 0; |
2010 | while(true) { |
2011 | for(uint32_t old_entry_idx = 0; old_entry_idx < HashBucket::kNumEntries; ++old_entry_idx) { |
2012 | HashBucketEntry old_entry = old_bucket->entries[old_entry_idx].load(); |
2013 | if(old_entry.unused()) { |
2014 | // Nothing to do. |
2015 | continue; |
2016 | } else if(old_entry.address() < head_address) { |
2017 | // Can't tell which new bucket the entry should go into; put it in both. |
2018 | AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry); |
2019 | AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry); |
2020 | continue; |
2021 | } |
2022 | |
2023 | const record_t* record = reinterpret_cast<const record_t*>(hlog.Get( |
2024 | old_entry.address())); |
2025 | KeyHash hash = record->key().GetHash(); |
2026 | if(hash.idx(new_size) < old_size) { |
2027 | // Record's key hashes to the 0 side of the new hash table. |
2028 | AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry); |
2029 | Address other_address = TraceBackForOtherChainStart(old_size, new_size, |
2030 | record->header.previous_address(), head_address, 0); |
2031 | if(other_address >= begin_address) { |
2032 | // We found a record that either is on disk or has a key that hashes to the 1 side of |
2033 | // the new hash table. |
2034 | AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, |
2035 | HashBucketEntry{ other_address, old_entry.tag(), false }); |
2036 | } |
2037 | } else { |
2038 | // Record's key hashes to the 1 side of the new hash table. |
2039 | AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry); |
2040 | Address other_address = TraceBackForOtherChainStart(old_size, new_size, |
2041 | record->header.previous_address(), head_address, 1); |
2042 | if(other_address >= begin_address) { |
2043 | // We found a record that either is on disk or has a key that hashes to the 0 side of |
2044 | // the new hash table. |
2045 | AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, |
2046 | HashBucketEntry{ other_address, old_entry.tag(), false }); |
2047 | } |
2048 | } |
2049 | } |
2050 | // Go to next bucket in the chain. |
2051 | HashBucketOverflowEntry overflow_entry = old_bucket->overflow_entry.load(); |
2052 | if(overflow_entry.unused()) { |
2053 | // No more buckets in the chain. |
2054 | break; |
2055 | } |
2056 | old_bucket = &overflow_buckets_allocator_[grow_.old_version].Get(overflow_entry.address()); |
2057 | } |
2058 | } |
2059 | // Done with this chunk. |
2060 | if(--grow_.num_pending_chunks == 0) { |
2061 | // Free the old hash table. |
2062 | state_[grow_.old_version].Uninitialize(); |
2063 | overflow_buckets_allocator_[grow_.old_version].Uninitialize(); |
2064 | break; |
2065 | } |
2066 | } |
2067 | // Thread has finished growing its part of the hash table. |
2068 | thread_ctx().phase = Phase::REST; |
2069 | // Thread ack that it has finished growing the hash table. |
2070 | if(epoch_.FinishThreadPhase(Phase::GROW_IN_PROGRESS)) { |
2071 | // Let other threads know that they can use the new hash table now. |
2072 | GlobalMoveToNextState(SystemState{ Action::GrowIndex, Phase::GROW_IN_PROGRESS, |
2073 | thread_ctx().version }); |
2074 | } else { |
2075 | while(system_state_.load().phase == Phase::GROW_IN_PROGRESS) { |
2076 | // Spin until all other threads have finished splitting their chunks. |
2077 | std::this_thread::yield(); |
2078 | } |
2079 | } |
2080 | } |
2081 | |
2082 | template <class K, class V, class D> |
2083 | bool FasterKv<K, V, D>::GlobalMoveToNextState(SystemState current_state) { |
2084 | SystemState next_state = current_state.GetNextState(); |
2085 | if(!system_state_.compare_exchange_strong(current_state, next_state)) { |
2086 | return false; |
2087 | } |
2088 | |
2089 | switch(next_state.action) { |
2090 | case Action::CheckpointFull: |
2091 | case Action::CheckpointIndex: |
2092 | case Action::CheckpointHybridLog: |
2093 | switch(next_state.phase) { |
2094 | case Phase::PREP_INDEX_CHKPT: |
2095 | // This case is handled directly inside Checkpoint[Index](). |
2096 | assert(false); |
2097 | break; |
2098 | case Phase::INDEX_CHKPT: |
2099 | assert(next_state.action != Action::CheckpointHybridLog); |
2100 | // Issue async request for fuzzy checkpoint |
2101 | assert(!checkpoint_.failed); |
2102 | if(CheckpointFuzzyIndex() != Status::Ok) { |
2103 | checkpoint_.failed = true; |
2104 | } |
2105 | break; |
2106 | case Phase::PREPARE: |
2107 | // Index checkpoint will never reach this state; and CheckpointHybridLog() will handle this |
2108 | // case directly. |
2109 | assert(next_state.action == Action::CheckpointFull); |
2110 | // INDEX_CHKPT -> PREPARE |
2111 | // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the ofb. |
2112 | // (Ensures that recovery won't accidentally reallocate from the ofb.) |
2113 | checkpoint_.index_metadata.ofb_count = |
2114 | overflow_buckets_allocator_[resize_info_.version].count(); |
2115 | // Write index meta data on disk |
2116 | if(WriteIndexMetadata() != Status::Ok) { |
2117 | checkpoint_.failed = true; |
2118 | } |
2119 | if(checkpoint_.index_persistence_callback) { |
2120 | // Notify the host that the index checkpoint has completed. |
2121 | checkpoint_.index_persistence_callback(Status::Ok); |
2122 | } |
2123 | break; |
2124 | case Phase::IN_PROGRESS: { |
2125 | assert(next_state.action != Action::CheckpointIndex); |
2126 | // PREPARE -> IN_PROGRESS |
2127 | // Do nothing |
2128 | break; |
2129 | } |
2130 | case Phase::WAIT_PENDING: |
2131 | assert(next_state.action != Action::CheckpointIndex); |
2132 | // IN_PROGRESS -> WAIT_PENDING |
2133 | // Do nothing |
2134 | break; |
2135 | case Phase::WAIT_FLUSH: |
2136 | assert(next_state.action != Action::CheckpointIndex); |
2137 | // WAIT_PENDING -> WAIT_FLUSH |
2138 | if(fold_over_snapshot) { |
2139 | // Move read-only to tail |
2140 | Address tail_address = hlog.ShiftReadOnlyToTail(); |
2141 | // Get final address for CPR |
2142 | checkpoint_.log_metadata.final_address = tail_address; |
2143 | } else { |
2144 | Address tail_address = hlog.GetTailAddress(); |
2145 | // Get final address for CPR |
2146 | checkpoint_.log_metadata.final_address = tail_address; |
2147 | checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path( |
2148 | checkpoint_.hybrid_log_token) + "snapshot.dat" ); |
2149 | if(checkpoint_.snapshot_file.Open(&disk.handler()) != Status::Ok) { |
2150 | checkpoint_.failed = true; |
2151 | } |
2152 | // Flush the log to a snapshot. |
2153 | hlog.AsyncFlushPagesToFile(checkpoint_.log_metadata.flushed_address.page(), |
2154 | checkpoint_.log_metadata.final_address, checkpoint_.snapshot_file, |
2155 | checkpoint_.flush_pending); |
2156 | } |
2157 | // Write CPR meta data file |
2158 | if(WriteCprMetadata() != Status::Ok) { |
2159 | checkpoint_.failed = true; |
2160 | } |
2161 | break; |
2162 | case Phase::PERSISTENCE_CALLBACK: |
2163 | assert(next_state.action != Action::CheckpointIndex); |
2164 | // WAIT_FLUSH -> PERSISTENCE_CALLBACK |
2165 | break; |
2166 | case Phase::REST: |
2167 | // PERSISTENCE_CALLBACK -> REST or INDEX_CHKPT -> REST |
2168 | if(next_state.action != Action::CheckpointIndex) { |
2169 | // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before |
2170 | // another checkpoint can be started.) |
2171 | checkpoint_.CheckpointDone(); |
2172 | // Free checkpoint locks! |
2173 | checkpoint_locks_.Free(); |
2174 | // Checkpoint is done--no more work for threads to do. |
2175 | system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version }); |
2176 | } else { |
2177 | // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the |
2178 | // ofb. (Ensures that recovery won't accidentally reallocate from the ofb.) |
2179 | checkpoint_.index_metadata.ofb_count = |
2180 | overflow_buckets_allocator_[resize_info_.version].count(); |
2181 | // Write index meta data on disk |
2182 | if(WriteIndexMetadata() != Status::Ok) { |
2183 | checkpoint_.failed = true; |
2184 | } |
2185 | auto index_persistence_callback = checkpoint_.index_persistence_callback; |
2186 | // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before |
2187 | // another checkpoint can be started.) |
2188 | checkpoint_.CheckpointDone(); |
2189 | // Checkpoint is done--no more work for threads to do. |
2190 | system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version }); |
2191 | if(index_persistence_callback) { |
2192 | // Notify the host that the index checkpoint has completed. |
2193 | index_persistence_callback(Status::Ok); |
2194 | } |
2195 | } |
2196 | break; |
2197 | default: |
2198 | // not reached |
2199 | assert(false); |
2200 | break; |
2201 | } |
2202 | break; |
2203 | case Action::GC: |
2204 | switch(next_state.phase) { |
2205 | case Phase::GC_IO_PENDING: |
2206 | // This case is handled directly inside ShiftBeginAddress(). |
2207 | assert(false); |
2208 | break; |
2209 | case Phase::GC_IN_PROGRESS: |
2210 | // GC_IO_PENDING -> GC_IN_PROGRESS |
2211 | // Tell the disk to truncate the log. |
2212 | hlog.Truncate(gc_.truncate_callback); |
2213 | break; |
2214 | case Phase::REST: |
2215 | // GC_IN_PROGRESS -> REST |
2216 | // GC is done--no more work for threads to do. |
2217 | if(gc_.complete_callback) { |
2218 | gc_.complete_callback(); |
2219 | } |
2220 | system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version }); |
2221 | break; |
2222 | default: |
2223 | // not reached |
2224 | assert(false); |
2225 | break; |
2226 | } |
2227 | break; |
2228 | case Action::GrowIndex: |
2229 | switch(next_state.phase) { |
2230 | case Phase::GROW_PREPARE: |
2231 | // This case is handled directly inside GrowIndex(). |
2232 | assert(false); |
2233 | break; |
2234 | case Phase::GROW_IN_PROGRESS: |
2235 | // Swap hash table versions so that all threads will use the new version after populating it. |
2236 | resize_info_.version = grow_.new_version; |
2237 | break; |
2238 | case Phase::REST: |
2239 | if(grow_.callback) { |
2240 | grow_.callback(state_[grow_.new_version].size()); |
2241 | } |
2242 | system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version }); |
2243 | break; |
2244 | default: |
2245 | // not reached |
2246 | assert(false); |
2247 | break; |
2248 | } |
2249 | break; |
2250 | default: |
2251 | // not reached |
2252 | assert(false); |
2253 | break; |
2254 | } |
2255 | return true; |
2256 | } |
2257 | |
2258 | template <class K, class V, class D> |
2259 | void FasterKv<K, V, D>::MarkAllPendingRequests() { |
2260 | uint32_t table_version = resize_info_.version; |
2261 | uint64_t table_size = state_[table_version].size(); |
2262 | |
2263 | for(const IAsyncContext* ctxt : thread_ctx().retry_requests) { |
2264 | const pending_context_t* context = static_cast<const pending_context_t*>(ctxt); |
2265 | // We will succeed, since no other thread can currently advance the entry's version, since this |
2266 | // thread hasn't acked "PENDING" phase completion yet. |
2267 | bool result = checkpoint_locks_.get_lock(context->key().GetHash()).try_lock_old(); |
2268 | assert(result); |
2269 | } |
2270 | for(const auto& pending_io : thread_ctx().pending_ios) { |
2271 | // We will succeed, since no other thread can currently advance the entry's version, since this |
2272 | // thread hasn't acked "PENDING" phase completion yet. |
2273 | bool result = checkpoint_locks_.get_lock(pending_io.second).try_lock_old(); |
2274 | assert(result); |
2275 | } |
2276 | } |
2277 | |
2278 | template <class K, class V, class D> |
2279 | void FasterKv<K, V, D>::HandleSpecialPhases() { |
2280 | SystemState final_state = system_state_.load(); |
2281 | if(final_state.phase == Phase::REST) { |
2282 | // Nothing to do; just reset thread context. |
2283 | thread_ctx().phase = Phase::REST; |
2284 | thread_ctx().version = final_state.version; |
2285 | return; |
2286 | } |
2287 | SystemState previous_state{ final_state.action, thread_ctx().phase, thread_ctx().version }; |
2288 | do { |
2289 | // Identify the transition (currentState -> nextState) |
2290 | SystemState current_state = (previous_state == final_state) ? final_state : |
2291 | previous_state.GetNextState(); |
2292 | switch(current_state.action) { |
2293 | case Action::CheckpointFull: |
2294 | case Action::CheckpointIndex: |
2295 | case Action::CheckpointHybridLog: |
2296 | switch(current_state.phase) { |
2297 | case Phase::PREP_INDEX_CHKPT: |
2298 | assert(current_state.action != Action::CheckpointHybridLog); |
2299 | // Both from REST -> PREP_INDEX_CHKPT and PREP_INDEX_CHKPT -> PREP_INDEX_CHKPT |
2300 | if(previous_state.phase == Phase::REST) { |
2301 | // Thread ack that we're performing a checkpoint. |
2302 | if(epoch_.FinishThreadPhase(Phase::PREP_INDEX_CHKPT)) { |
2303 | GlobalMoveToNextState(current_state); |
2304 | } |
2305 | } |
2306 | break; |
2307 | case Phase::INDEX_CHKPT: { |
2308 | assert(current_state.action != Action::CheckpointHybridLog); |
2309 | // Both from PREP_INDEX_CHKPT -> INDEX_CHKPT and INDEX_CHKPT -> INDEX_CHKPT |
2310 | Status result = CheckpointFuzzyIndexComplete(); |
2311 | if(result != Status::Pending && result != Status::Ok) { |
2312 | checkpoint_.failed = true; |
2313 | } |
2314 | if(result != Status::Pending) { |
2315 | if(current_state.action == Action::CheckpointIndex) { |
2316 | // This thread is done now. |
2317 | thread_ctx().phase = Phase::REST; |
2318 | // Thread ack that it is done. |
2319 | if(epoch_.FinishThreadPhase(Phase::INDEX_CHKPT)) { |
2320 | GlobalMoveToNextState(current_state); |
2321 | } |
2322 | } else { |
2323 | // Index checkpoint is done; move on to PREPARE phase. |
2324 | GlobalMoveToNextState(current_state); |
2325 | } |
2326 | } |
2327 | break; |
2328 | } |
2329 | case Phase::PREPARE: |
2330 | assert(current_state.action != Action::CheckpointIndex); |
2331 | // Handle (INDEX_CHKPT -> PREPARE or REST -> PREPARE) and PREPARE -> PREPARE |
2332 | if(previous_state.phase != Phase::PREPARE) { |
2333 | // mark pending requests |
2334 | MarkAllPendingRequests(); |
2335 | // keep a count of number of threads |
2336 | ++checkpoint_.log_metadata.num_threads; |
2337 | // set the thread index |
2338 | checkpoint_.log_metadata.guids[Thread::id()] = thread_ctx().guid; |
2339 | // Thread ack that it has finished marking its pending requests. |
2340 | if(epoch_.FinishThreadPhase(Phase::PREPARE)) { |
2341 | GlobalMoveToNextState(current_state); |
2342 | } |
2343 | } |
2344 | break; |
2345 | case Phase::IN_PROGRESS: |
2346 | assert(current_state.action != Action::CheckpointIndex); |
2347 | // Handle PREPARE -> IN_PROGRESS and IN_PROGRESS -> IN_PROGRESS |
2348 | if(previous_state.phase == Phase::PREPARE) { |
2349 | assert(prev_thread_ctx().retry_requests.empty()); |
2350 | assert(prev_thread_ctx().pending_ios.empty()); |
2351 | assert(prev_thread_ctx().io_responses.empty()); |
2352 | |
2353 | // Get a new thread context; keep track of the old one as "previous." |
2354 | thread_contexts_[Thread::id()].swap(); |
2355 | // initialize a new local context |
2356 | thread_ctx().Initialize(Phase::IN_PROGRESS, current_state.version, |
2357 | prev_thread_ctx().guid, prev_thread_ctx().serial_num); |
2358 | // Thread ack that it has swapped contexts. |
2359 | if(epoch_.FinishThreadPhase(Phase::IN_PROGRESS)) { |
2360 | GlobalMoveToNextState(current_state); |
2361 | } |
2362 | } |
2363 | break; |
2364 | case Phase::WAIT_PENDING: |
2365 | assert(current_state.action != Action::CheckpointIndex); |
2366 | // Handle IN_PROGRESS -> WAIT_PENDING and WAIT_PENDING -> WAIT_PENDING |
2367 | if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_PENDING)) { |
2368 | if(prev_thread_ctx().pending_ios.empty() && |
2369 | prev_thread_ctx().retry_requests.empty()) { |
2370 | // Thread ack that it has completed its pending I/Os. |
2371 | if(epoch_.FinishThreadPhase(Phase::WAIT_PENDING)) { |
2372 | GlobalMoveToNextState(current_state); |
2373 | } |
2374 | } |
2375 | } |
2376 | break; |
2377 | case Phase::WAIT_FLUSH: |
2378 | assert(current_state.action != Action::CheckpointIndex); |
2379 | // Handle WAIT_PENDING -> WAIT_FLUSH and WAIT_FLUSH -> WAIT_FLUSH |
2380 | if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_FLUSH)) { |
2381 | bool flushed; |
2382 | if(fold_over_snapshot) { |
2383 | flushed = hlog.flushed_until_address.load() >= checkpoint_.log_metadata.final_address; |
2384 | } else { |
2385 | flushed = checkpoint_.flush_pending.load() == 0; |
2386 | } |
2387 | if(flushed) { |
2388 | // write context info |
2389 | WriteCprContext(); |
2390 | // Thread ack that it has written its CPU context. |
2391 | if(epoch_.FinishThreadPhase(Phase::WAIT_FLUSH)) { |
2392 | GlobalMoveToNextState(current_state); |
2393 | } |
2394 | } |
2395 | } |
2396 | break; |
2397 | case Phase::PERSISTENCE_CALLBACK: |
2398 | assert(current_state.action != Action::CheckpointIndex); |
2399 | // Handle WAIT_FLUSH -> PERSISTENCE_CALLBACK and PERSISTENCE_CALLBACK -> PERSISTENCE_CALLBACK |
2400 | if(previous_state.phase == Phase::WAIT_FLUSH) { |
2401 | // Persistence callback |
2402 | if(checkpoint_.hybrid_log_persistence_callback) { |
2403 | checkpoint_.hybrid_log_persistence_callback(Status::Ok, prev_thread_ctx().serial_num); |
2404 | } |
2405 | // Thread has finished checkpointing. |
2406 | thread_ctx().phase = Phase::REST; |
2407 | // Thread ack that it has finished checkpointing. |
2408 | if(epoch_.FinishThreadPhase(Phase::PERSISTENCE_CALLBACK)) { |
2409 | GlobalMoveToNextState(current_state); |
2410 | } |
2411 | } |
2412 | break; |
2413 | default: |
2414 | // nothing to do. |
2415 | break; |
2416 | } |
2417 | break; |
2418 | case Action::GC: |
2419 | switch(current_state.phase) { |
2420 | case Phase::GC_IO_PENDING: |
2421 | // Handle REST -> GC_IO_PENDING and GC_IO_PENDING -> GC_IO_PENDING. |
2422 | if(previous_state.phase == Phase::REST) { |
2423 | assert(prev_thread_ctx().retry_requests.empty()); |
2424 | assert(prev_thread_ctx().pending_ios.empty()); |
2425 | assert(prev_thread_ctx().io_responses.empty()); |
2426 | // Get a new thread context; keep track of the old one as "previous." |
2427 | thread_contexts_[Thread::id()].swap(); |
2428 | // initialize a new local context |
2429 | thread_ctx().Initialize(Phase::GC_IO_PENDING, current_state.version, |
2430 | prev_thread_ctx().guid, prev_thread_ctx().serial_num); |
2431 | } |
2432 | |
2433 | // See if the old thread context has completed its pending I/Os. |
2434 | if(!epoch_.HasThreadFinishedPhase(Phase::GC_IO_PENDING)) { |
2435 | if(prev_thread_ctx().pending_ios.empty() && |
2436 | prev_thread_ctx().retry_requests.empty()) { |
2437 | // Thread ack that it has completed its pending I/Os. |
2438 | if(epoch_.FinishThreadPhase(Phase::GC_IO_PENDING)) { |
2439 | GlobalMoveToNextState(current_state); |
2440 | } |
2441 | } |
2442 | } |
2443 | break; |
2444 | case Phase::GC_IN_PROGRESS: |
2445 | // Handle GC_IO_PENDING -> GC_IN_PROGRESS and GC_IN_PROGRESS -> GC_IN_PROGRESS. |
2446 | if(!epoch_.HasThreadFinishedPhase(Phase::GC_IN_PROGRESS)) { |
2447 | if(!CleanHashTableBuckets()) { |
2448 | // No more buckets for this thread to clean; thread has finished GC. |
2449 | thread_ctx().phase = Phase::REST; |
2450 | // Thread ack that it has finished GC. |
2451 | if(epoch_.FinishThreadPhase(Phase::GC_IN_PROGRESS)) { |
2452 | GlobalMoveToNextState(current_state); |
2453 | } |
2454 | } |
2455 | } |
2456 | break; |
2457 | default: |
2458 | assert(false); // not reached |
2459 | break; |
2460 | } |
2461 | break; |
2462 | case Action::GrowIndex: |
2463 | switch(current_state.phase) { |
2464 | case Phase::GROW_PREPARE: |
2465 | if(previous_state.phase == Phase::REST) { |
2466 | // Thread ack that we're going to grow the hash table. |
2467 | if(epoch_.FinishThreadPhase(Phase::GROW_PREPARE)) { |
2468 | GlobalMoveToNextState(current_state); |
2469 | } |
2470 | } else { |
2471 | // Wait for all other threads to finish their outstanding (synchronous) hash table |
2472 | // operations. |
2473 | std::this_thread::yield(); |
2474 | } |
2475 | break; |
2476 | case Phase::GROW_IN_PROGRESS: |
2477 | SplitHashTableBuckets(); |
2478 | break; |
2479 | } |
2480 | break; |
2481 | } |
2482 | thread_ctx().phase = current_state.phase; |
2483 | thread_ctx().version = current_state.version; |
2484 | previous_state = current_state; |
2485 | } while(previous_state != final_state); |
2486 | } |
2487 | |
2488 | template <class K, class V, class D> |
2489 | bool FasterKv<K, V, D>::Checkpoint(void(*index_persistence_callback)(Status result), |
2490 | void(*hybrid_log_persistence_callback)(Status result, |
2491 | uint64_t persistent_serial_num), Guid& token) { |
2492 | // Only one thread can initiate a checkpoint at a time. |
2493 | SystemState expected{ Action::None, Phase::REST, system_state_.load().version }; |
2494 | SystemState desired{ Action::CheckpointFull, Phase::REST, expected.version }; |
2495 | if(!system_state_.compare_exchange_strong(expected, desired)) { |
2496 | // Can't start a new checkpoint while a checkpoint or recovery is already in progress. |
2497 | return false; |
2498 | } |
2499 | // We are going to start a checkpoint. |
2500 | epoch_.ResetPhaseFinished(); |
2501 | // Initialize all contexts |
2502 | token = Guid::Create(); |
2503 | disk.CreateIndexCheckpointDirectory(token); |
2504 | disk.CreateCprCheckpointDirectory(token); |
2505 | // Obtain tail address for fuzzy index checkpoint |
2506 | if(!fold_over_snapshot) { |
2507 | checkpoint_.InitializeCheckpoint(token, desired.version, state_[resize_info_.version].size(), |
2508 | hlog.begin_address.load(), hlog.GetTailAddress(), true, |
2509 | hlog.flushed_until_address.load(), |
2510 | index_persistence_callback, |
2511 | hybrid_log_persistence_callback); |
2512 | } else { |
2513 | checkpoint_.InitializeCheckpoint(token, desired.version, state_[resize_info_.version].size(), |
2514 | hlog.begin_address.load(), hlog.GetTailAddress(), false, |
2515 | Address::kInvalidAddress, index_persistence_callback, |
2516 | hybrid_log_persistence_callback); |
2517 | |
2518 | } |
2519 | InitializeCheckpointLocks(); |
2520 | // Let other threads know that the checkpoint has started. |
2521 | system_state_.store(desired.GetNextState()); |
2522 | return true; |
2523 | } |
2524 | |
2525 | template <class K, class V, class D> |
2526 | bool FasterKv<K, V, D>::CheckpointIndex(void(*index_persistence_callback)(Status result), |
2527 | Guid& token) { |
2528 | // Only one thread can initiate a checkpoint at a time. |
2529 | SystemState expected{ Action::None, Phase::REST, system_state_.load().version }; |
2530 | SystemState desired{ Action::CheckpointIndex, Phase::REST, expected.version }; |
2531 | if(!system_state_.compare_exchange_strong(expected, desired)) { |
2532 | // Can't start a new checkpoint while a checkpoint or recovery is already in progress. |
2533 | return false; |
2534 | } |
2535 | // We are going to start a checkpoint. |
2536 | epoch_.ResetPhaseFinished(); |
2537 | // Initialize all contexts |
2538 | token = Guid::Create(); |
2539 | disk.CreateIndexCheckpointDirectory(token); |
2540 | checkpoint_.InitializeIndexCheckpoint(token, desired.version, |
2541 | state_[resize_info_.version].size(), |
2542 | hlog.begin_address.load(), hlog.GetTailAddress(), |
2543 | index_persistence_callback); |
2544 | // Let other threads know that the checkpoint has started. |
2545 | system_state_.store(desired.GetNextState()); |
2546 | return true; |
2547 | } |
2548 | |
2549 | template <class K, class V, class D> |
2550 | bool FasterKv<K, V, D>::CheckpointHybridLog(void(*hybrid_log_persistence_callback)(Status result, |
2551 | uint64_t persistent_serial_num), Guid& token) { |
2552 | // Only one thread can initiate a checkpoint at a time. |
2553 | SystemState expected{ Action::None, Phase::REST, system_state_.load().version }; |
2554 | SystemState desired{ Action::CheckpointHybridLog, Phase::REST, expected.version }; |
2555 | if(!system_state_.compare_exchange_strong(expected, desired)) { |
2556 | // Can't start a new checkpoint while a checkpoint or recovery is already in progress. |
2557 | return false; |
2558 | } |
2559 | // We are going to start a checkpoint. |
2560 | epoch_.ResetPhaseFinished(); |
2561 | // Initialize all contexts |
2562 | token = Guid::Create(); |
2563 | disk.CreateCprCheckpointDirectory(token); |
2564 | // Obtain tail address for fuzzy index checkpoint |
2565 | if(!fold_over_snapshot) { |
2566 | checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, true, |
2567 | hlog.flushed_until_address.load(), hybrid_log_persistence_callback); |
2568 | } else { |
2569 | checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, false, |
2570 | Address::kInvalidAddress, hybrid_log_persistence_callback); |
2571 | } |
2572 | InitializeCheckpointLocks(); |
2573 | // Let other threads know that the checkpoint has started. |
2574 | system_state_.store(desired.GetNextState()); |
2575 | return true; |
2576 | } |
2577 | |
2578 | template <class K, class V, class D> |
2579 | Status FasterKv<K, V, D>::Recover(const Guid& index_token, const Guid& hybrid_log_token, |
2580 | uint32_t& version, |
2581 | std::vector<Guid>& session_ids) { |
2582 | version = 0; |
2583 | session_ids.clear(); |
2584 | SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version }; |
2585 | if(!system_state_.compare_exchange_strong(expected, |
2586 | SystemState{ Action::Recover, Phase::REST, expected.version })) { |
2587 | return Status::Aborted; |
2588 | } |
2589 | checkpoint_.InitializeRecover(index_token, hybrid_log_token); |
2590 | Status status; |
2591 | #define BREAK_NOT_OK(s) \ |
2592 | status = (s); \ |
2593 | if (status != Status::Ok) break |
2594 | |
2595 | do { |
2596 | // Index and log metadata. |
2597 | BREAK_NOT_OK(ReadIndexMetadata(index_token)); |
2598 | BREAK_NOT_OK(ReadCprMetadata(hybrid_log_token)); |
2599 | if(checkpoint_.index_metadata.version != checkpoint_.log_metadata.version) { |
2600 | // Index and hybrid-log checkpoints should have the same version. |
2601 | status = Status::Corruption; |
2602 | break; |
2603 | } |
2604 | |
2605 | system_state_.store(SystemState{ Action::Recover, Phase::REST, |
2606 | checkpoint_.log_metadata.version + 1 }); |
2607 | |
2608 | BREAK_NOT_OK(ReadCprContexts(hybrid_log_token, checkpoint_.log_metadata.guids)); |
2609 | // The index itself (including overflow buckets). |
2610 | BREAK_NOT_OK(RecoverFuzzyIndex()); |
2611 | BREAK_NOT_OK(RecoverFuzzyIndexComplete(true)); |
2612 | // Any changes made to the log while the index was being fuzzy-checkpointed. |
2613 | if(fold_over_snapshot) { |
2614 | BREAK_NOT_OK(RecoverHybridLog()); |
2615 | } else { |
2616 | BREAK_NOT_OK(RecoverHybridLogFromSnapshotFile()); |
2617 | } |
2618 | BREAK_NOT_OK(RestoreHybridLog()); |
2619 | } while(false); |
2620 | if(status == Status::Ok) { |
2621 | for(const auto& token : checkpoint_.continue_tokens) { |
2622 | session_ids.push_back(token.first); |
2623 | } |
2624 | version = checkpoint_.log_metadata.version; |
2625 | } |
2626 | checkpoint_.RecoverDone(); |
2627 | system_state_.store(SystemState{ Action::None, Phase::REST, |
2628 | checkpoint_.log_metadata.version + 1 }); |
2629 | return status; |
2630 | #undef BREAK_NOT_OK |
2631 | } |
2632 | |
2633 | template <class K, class V, class D> |
2634 | bool FasterKv<K, V, D>::ShiftBeginAddress(Address address, |
2635 | GcState::truncate_callback_t truncate_callback, |
2636 | GcState::complete_callback_t complete_callback) { |
2637 | SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version }; |
2638 | if(!system_state_.compare_exchange_strong(expected, |
2639 | SystemState{ Action::GC, Phase::REST, expected.version })) { |
2640 | // Can't start a GC while an action is already in progress. |
2641 | return false; |
2642 | } |
2643 | hlog.begin_address.store(address); |
2644 | // Each active thread will notify the epoch when all pending I/Os have completed. |
2645 | epoch_.ResetPhaseFinished(); |
2646 | uint64_t num_chunks = std::max(state_[resize_info_.version].size() / kGcHashTableChunkSize, |
2647 | (uint64_t)1); |
2648 | gc_.Initialize(truncate_callback, complete_callback, num_chunks); |
2649 | // Let other threads know to complete their pending I/Os, so that the log can be truncated. |
2650 | system_state_.store(SystemState{ Action::GC, Phase::GC_IO_PENDING, expected.version }); |
2651 | return true; |
2652 | } |
2653 | |
2654 | template <class K, class V, class D> |
2655 | bool FasterKv<K, V, D>::GrowIndex(GrowState::callback_t caller_callback) { |
2656 | SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version }; |
2657 | if(!system_state_.compare_exchange_strong(expected, |
2658 | SystemState{ Action::GrowIndex, Phase::REST, expected.version })) { |
2659 | // An action is already in progress. |
2660 | return false; |
2661 | } |
2662 | epoch_.ResetPhaseFinished(); |
2663 | uint8_t current_version = resize_info_.version; |
2664 | assert(current_version == 0 || current_version == 1); |
2665 | uint8_t next_version = 1 - current_version; |
2666 | uint64_t num_chunks = std::max(state_[current_version].size() / kGrowHashTableChunkSize, |
2667 | (uint64_t)1); |
2668 | grow_.Initialize(caller_callback, current_version, num_chunks); |
2669 | // Initialize the next version of our hash table to be twice the size of the current version. |
2670 | state_[next_version].Initialize(state_[current_version].size() * 2, disk.log().alignment()); |
2671 | overflow_buckets_allocator_[next_version].Initialize(disk.log().alignment(), epoch_); |
2672 | |
2673 | SystemState next = SystemState{ Action::GrowIndex, Phase::GROW_PREPARE, expected.version }; |
2674 | system_state_.store(next); |
2675 | |
2676 | // Let this thread know it should be growing the index. |
2677 | Refresh(); |
2678 | return true; |
2679 | } |
2680 | |
2681 | } |
2682 | } // namespace FASTER::core |