1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <experimental/filesystem>
7
8using namespace FASTER;
9
10/// Disk's log uses 32 MB segments.
11typedef FASTER::device::FileSystemDisk<handler_t, 33554432L> disk_t;
12typedef FASTER::device::FileSystemFile<handler_t> file_t;
13
14TEST(CLASS, MallocFixedPageSize) {
15 typedef MallocFixedPageSize<HashBucket, disk_t> alloc_t;
16
17 // Test copied from C#, RecoveryTest.cs.
18 std::random_device rd{};
19 uint32_t seed = rd();
20 std::mt19937_64 rng{ seed };
21 std::experimental::filesystem::create_directories("test_ofb");
22
23 size_t num_bytes_written;
24
25 LightEpoch epoch;
26 alloc_t allocator{};
27 allocator.Initialize(512, epoch);
28
29 size_t num_buckets_to_add = 2 * FixedPage<HashBucket>::kPageSize + 5;
30
31 FixedPageAddress* buckets = new FixedPageAddress[num_buckets_to_add];
32
33 {
34 disk_t checkpoint_disk{ "test_ofb", epoch };
35 file_t checkpoint_file = checkpoint_disk.NewFile("test_ofb.dat");
36 Status result = checkpoint_file.Open(&checkpoint_disk.handler());
37 ASSERT_EQ(Status::Ok, result);
38
39 //do something
40 for(size_t bucket_idx = 0; bucket_idx < num_buckets_to_add; ++bucket_idx) {
41 buckets[bucket_idx] = allocator.Allocate();
42 HashBucket& bucket = allocator.Get(buckets[bucket_idx]);
43 for(size_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
44 HashBucketEntry expected{ 0 };
45 uint64_t random_num = rng();
46 bool success = bucket.entries[entry_idx].compare_exchange_strong(expected, random_num);
47 ASSERT_TRUE(success);
48 }
49 HashBucketOverflowEntry expected{ 0 };
50 uint64_t random_num = rng();
51 bool success = bucket.overflow_entry.compare_exchange_strong(expected, random_num);
52 ASSERT_TRUE(success);
53 }
54 //issue call to checkpoint
55 result = allocator.Checkpoint(checkpoint_disk, std::move(checkpoint_file), num_bytes_written);
56 ASSERT_EQ(Status::Ok, result);
57 // (All the bucket we allocated, + the null page.)
58 ASSERT_EQ((num_buckets_to_add + 1) * sizeof(HashBucket), num_bytes_written);
59 //wait until complete
60 result = allocator.CheckpointComplete(true);
61 ASSERT_EQ(Status::Ok, result);
62 }
63
64 LightEpoch recover_epoch;
65 alloc_t recover_allocator{};
66 recover_allocator.Initialize(512, recover_epoch);
67 disk_t recover_disk{ "test_ofb", recover_epoch };
68 file_t recover_file = recover_disk.NewFile("test_ofb.dat");
69 Status result = recover_file.Open(&recover_disk.handler());
70 ASSERT_EQ(Status::Ok, result);
71
72 //issue call to recover
73 result = recover_allocator.Recover(recover_disk, std::move(recover_file), num_bytes_written,
74 num_bytes_written / sizeof(typename alloc_t::item_t));
75 ASSERT_EQ(Status::Ok, result);
76 //wait until complete
77 result = recover_allocator.RecoverComplete(true);
78 ASSERT_EQ(Status::Ok, result);
79
80 //verify that something
81 std::mt19937_64 rng2{ seed };
82 for(size_t bucket_idx = 0; bucket_idx < num_buckets_to_add; ++bucket_idx) {
83 HashBucket& bucket = allocator.Get(buckets[bucket_idx]);
84 for(size_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
85 uint64_t random_num = rng2();
86 ASSERT_EQ(random_num, bucket.entries[entry_idx].load().control_);
87 }
88 uint64_t random_num = rng2();
89 ASSERT_EQ(random_num, bucket.overflow_entry.load().control_);
90 }
91
92 FixedPageAddress address = recover_allocator.Allocate();
93 ASSERT_EQ(FixedPageAddress{ num_buckets_to_add + 1 }, address);
94
95 delete[] buckets;
96}
97
98TEST(CLASS, InternalHashTable) {
99 // (Just the hash table itself--no overflow buckets.)
100 std::random_device rd{};
101 uint32_t seed = rd();
102 std::mt19937_64 rng{ seed };
103 std::experimental::filesystem::create_directories("test_ht");
104
105 constexpr uint64_t kNumBuckets = 8388608/8;
106 size_t num_bytes_written;
107 {
108 LightEpoch epoch;
109 disk_t checkpoint_disk{ "test_ht", epoch };
110 file_t checkpoint_file = checkpoint_disk.NewFile("test_ht.dat");
111 Status result = checkpoint_file.Open(&checkpoint_disk.handler());
112 ASSERT_EQ(Status::Ok, result);
113
114 InternalHashTable<disk_t> table{};
115 table.Initialize(kNumBuckets, checkpoint_file.alignment());
116
117 //do something
118 for(size_t bucket_idx = 0; bucket_idx < kNumBuckets; ++bucket_idx) {
119 for(size_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
120 HashBucketEntry expected{ 0 };
121 bool success = table.bucket(bucket_idx).entries[entry_idx].compare_exchange_strong(
122 expected, rng());
123 ASSERT_TRUE(success);
124 }
125 HashBucketOverflowEntry expected{ 0 };
126 bool success = table.bucket(bucket_idx).overflow_entry.compare_exchange_strong(expected,
127 rng());
128 ASSERT_TRUE(success);
129 }
130
131 //issue call to checkpoint
132 result = table.Checkpoint(checkpoint_disk, std::move(checkpoint_file), num_bytes_written);
133 ASSERT_EQ(Status::Ok, result);
134 // (All the bucket we allocated, + the null page.)
135 ASSERT_EQ(kNumBuckets * sizeof(HashBucket), num_bytes_written);
136 //wait until complete
137 result = table.CheckpointComplete(true);
138 ASSERT_EQ(Status::Ok, result);
139 }
140
141 LightEpoch epoch;
142 disk_t recover_disk{ "test_ht", epoch };
143 file_t recover_file = recover_disk.NewFile("test_ht.dat");
144 Status result = recover_file.Open(&recover_disk.handler());
145 ASSERT_EQ(Status::Ok, result);
146
147 InternalHashTable<disk_t> recover_table{};
148 //issue call to recover
149 result = recover_table.Recover(recover_disk, std::move(recover_file), num_bytes_written);
150 ASSERT_EQ(Status::Ok, result);
151 //wait until complete
152 result = recover_table.RecoverComplete(true);
153 ASSERT_EQ(Status::Ok, result);
154
155 //verify that something
156 std::mt19937_64 rng2{ seed };
157 for(size_t bucket_idx = 0; bucket_idx < kNumBuckets; ++bucket_idx) {
158 for(size_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
159 uint64_t random_num = rng2();
160 ASSERT_EQ(random_num, recover_table.bucket(bucket_idx).entries[entry_idx].load().control_);
161 }
162 uint64_t random_num = rng2();
163 ASSERT_EQ(random_num, recover_table.bucket(bucket_idx).overflow_entry.load().control_);
164 }
165}
166
167TEST(CLASS, Serial) {
168 class Key {
169 public:
170 Key(uint32_t key)
171 : key_{ key } {
172 }
173
174 inline static constexpr uint32_t size() {
175 return static_cast<uint32_t>(sizeof(Key));
176 }
177 inline KeyHash GetHash() const {
178 std::hash<uint32_t> hash_fn{};
179 return KeyHash{ hash_fn(key_) };
180 }
181
182 /// Comparison operators.
183 inline bool operator==(const Key& other) const {
184 return key_ == other.key_;
185 }
186 inline bool operator!=(const Key& other) const {
187 return key_ != other.key_;
188 }
189
190 private:
191 uint32_t key_;
192 };
193 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
194 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
195
196 class UpsertContext1;
197 class UpsertContext2;
198 class ReadContext1;
199 class ReadContext2;
200
201 class Value1 {
202 public:
203 inline uint32_t size() const {
204 return size_;
205 }
206
207 friend class UpsertContext1;
208 friend class UpsertContext2;
209 friend class ReadContext1;
210
211 private:
212 uint16_t size_;
213 union {
214 std::atomic<uint32_t> atomic_val1_;
215 uint32_t val1_;
216 };
217 };
218 static_assert(sizeof(Value1) == 8, "sizeof(Value1) != 8");
219 static_assert(alignof(Value1) == 4, "alignof(Value1) != 4");
220
221 class Value2 : public Value1 {
222 public:
223 friend class UpsertContext2;
224 friend class ReadContext2;
225
226 private:
227 union {
228 std::atomic<uint16_t> atomic_val2_;
229 uint16_t val2_;
230 };
231 uint8_t wasted_space[3];
232 };
233 static_assert(sizeof(Value2) == 16, "sizeof(Value2) != 16");
234 static_assert(alignof(Value2) == 4, "alignof(Value2) != 4");
235
236 class UpsertContext1 : public IAsyncContext {
237 public:
238 typedef Key key_t;
239 typedef Value1 value_t;
240
241 UpsertContext1(const Key& key, uint32_t val)
242 : key_{ key }
243 , val_{ val } {
244 }
245
246 /// Copy (and deep-copy) constructor.
247 UpsertContext1(const UpsertContext1& other)
248 : key_{ other.key_ }
249 , val_{ other.val_ } {
250 }
251
252 /// The implicit and explicit interfaces require a key() accessor.
253 inline const Key& key() const {
254 return key_;
255 }
256 inline static constexpr uint32_t value_size() {
257 return sizeof(value_t);
258 }
259 /// Non-atomic and atomic Put() methods.
260 inline void Put(Value1& value) {
261 value.size_ = sizeof(value);
262 value.val1_ = val_;
263 }
264 inline bool PutAtomic(Value1& value) {
265 EXPECT_EQ(value.size_, sizeof(value));
266 value.atomic_val1_.store(val_);
267 return true;
268 }
269
270 protected:
271 /// The explicit interface requires a DeepCopy_Internal() implementation.
272 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
273 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
274 }
275
276 private:
277 Key key_;
278 uint32_t val_;
279 };
280
281 class UpsertContext2 : public IAsyncContext {
282 public:
283 typedef Key key_t;
284 typedef Value2 value_t;
285
286 UpsertContext2(const Key& key, uint16_t val)
287 : key_{ key }
288 , val_{ val } {
289 }
290
291 /// Copy (and deep-copy) constructor.
292 UpsertContext2(const UpsertContext2& other)
293 : key_{ other.key_ }
294 , val_{ other.val_ } {
295 }
296
297 /// The implicit and explicit interfaces require a key() accessor.
298 inline const Key& key() const {
299 return key_;
300 }
301 inline static constexpr uint32_t value_size() {
302 return sizeof(value_t);
303 }
304 /// Non-atomic and atomic Put() methods.
305 inline void Put(Value2& value) {
306 value.size_ = sizeof(value);
307 value.val2_ = val_;
308 }
309 inline bool PutAtomic(Value2& value) {
310 EXPECT_EQ(value.size_, sizeof(value));
311 value.atomic_val2_.store(val_);
312 return true;
313 }
314
315 protected:
316 /// The explicit interface requires a DeepCopy_Internal() implementation.
317 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
318 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
319 }
320
321 private:
322 Key key_;
323 uint16_t val_;
324 };
325
326 class ReadContext1 : public IAsyncContext {
327 public:
328 typedef Key key_t;
329 typedef Value1 value_t;
330
331 ReadContext1(Key key, uint32_t expected_)
332 : key_{ key }
333 , val_{ 0 }
334 , expected{ expected_ } {
335 }
336
337 /// Copy (and deep-copy) constructor.
338 ReadContext1(const ReadContext1& other)
339 : key_{ other.key_ }
340 , val_{ other.val_ }
341 , expected{ other.expected } {
342 }
343
344 /// The implicit and explicit interfaces require a key() accessor.
345 inline const Key& key() const {
346 return key_;
347 }
348
349 inline void Get(const Value1& value) {
350 val_ = value.val1_;
351 }
352 inline void GetAtomic(const Value1& value) {
353 val_ = value.atomic_val1_.load();
354 }
355
356 uint64_t val() const {
357 return val_;
358 }
359
360 protected:
361 /// The explicit interface requires a DeepCopy_Internal() implementation.
362 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
363 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
364 }
365
366 private:
367 Key key_;
368 uint32_t val_;
369 public:
370 const uint32_t expected;
371 };
372
373 class ReadContext2 : public IAsyncContext {
374 public:
375 typedef Key key_t;
376 typedef Value2 value_t;
377
378 ReadContext2(Key key, uint16_t expected_)
379 : key_{ key }
380 , val_{ 0 }
381 , expected{ expected_ } {
382 }
383
384 /// Copy (and deep-copy) constructor.
385 ReadContext2(const ReadContext2& other)
386 : key_{ other.key_ }
387 , val_{ other.val_ }
388 , expected{ other.expected } {
389 }
390
391 /// The implicit and explicit interfaces require a key() accessor.
392 inline const Key& key() const {
393 return key_;
394 }
395
396 inline void Get(const Value2& value) {
397 val_ = value.val2_;
398 }
399 inline void GetAtomic(const Value2& value) {
400 val_ = value.atomic_val2_.load();
401 }
402
403 uint64_t val() const {
404 return val_;
405 }
406
407 protected:
408 /// The explicit interface requires a DeepCopy_Internal() implementation.
409 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
410 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
411 }
412
413 private:
414 Key key_;
415 uint16_t val_;
416 public:
417 const uint16_t expected;
418 };
419
420 auto upsert_callback = [](IAsyncContext* context, Status result) {
421 // Upserts don't go to disk.
422 ASSERT_TRUE(false);
423 };
424
425 std::experimental::filesystem::create_directories("storage");
426
427 static constexpr size_t kNumRecords = 600000;
428
429 Guid session_id;
430 Guid token;
431
432 {
433 // Populate and checkpoint the store.
434 // 6 pages!
435 FasterKv<Key, Value1, disk_t> store{ 524288, 201326592, "storage", 0.4 };
436
437 session_id = store.StartSession();
438
439 // upsert some records
440 assert(kNumRecords % 2 == 0);
441 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
442 {
443 UpsertContext1 context{ Key{ idx }, idx + 7 };
444 Status result = store.Upsert(context, upsert_callback, 1);
445 ASSERT_EQ(Status::Ok, result);
446 }
447 {
448 UpsertContext2 context{ Key{ idx + 1 }, 55 };
449 Status result = store.Upsert(context, upsert_callback, 1);
450 ASSERT_EQ(Status::Ok, result);
451 }
452 }
453 // verify them
454 static std::atomic<uint64_t> records_read;
455 records_read = 0;
456 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
457 auto callback1 = [](IAsyncContext* ctxt, Status result) {
458 CallbackContext<ReadContext1> context{ ctxt };
459 ASSERT_EQ(Status::Ok, result);
460 ++records_read;
461 ASSERT_EQ(context->expected, context->val());
462 };
463 auto callback2 = [](IAsyncContext* ctxt, Status result) {
464 CallbackContext<ReadContext2> context{ ctxt };
465 ASSERT_EQ(Status::Ok, result);
466 ++records_read;
467 ASSERT_EQ(context->expected, context->val());
468 };
469
470 if(idx % 256 == 0) {
471 store.Refresh();
472 store.CompletePending(false);
473 }
474
475 {
476 ReadContext1 context{ Key{ idx }, idx + 7 };
477 Status result = store.Read(context, callback1, 1);
478 if(result == Status::Ok) {
479 ++records_read;
480 ASSERT_EQ(context.expected, context.val());
481 } else {
482 ASSERT_EQ(Status::Pending, result);
483 }
484 }
485 {
486 ReadContext2 context{ Key{ idx + 1 }, 55 };
487 Status result = store.Read(context, callback2, 1);
488 if(result == Status::Ok) {
489 ++records_read;
490 ASSERT_EQ(context.expected, context.val());
491 } else {
492 ASSERT_EQ(Status::Pending, result);
493 }
494 }
495 }
496
497 static std::atomic<size_t> num_threads_persistent;
498 num_threads_persistent = 0;
499 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
500 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
501 threads_persistent[idx] = false;
502 }
503
504 auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
505 bool expected = false;
506 ASSERT_EQ(Status::Ok, result);
507 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected,
508 true));
509 ++num_threads_persistent;
510 };
511
512 // checkpoint (transition from REST to INDEX_CHKPT)
513 ASSERT_TRUE(store.Checkpoint(nullptr, hybrid_log_persistence_callback, token));
514
515 while(num_threads_persistent < 1) {
516 store.CompletePending(false);
517 }
518
519 bool result = store.CompletePending(true);
520 ASSERT_TRUE(result);
521 ASSERT_EQ(kNumRecords, records_read.load());
522
523 store.StopSession();
524 }
525
526 // Test recovery.
527 FasterKv<Key, Value1, disk_t> new_store{ 524288, 201326592, "storage", 0.4 };
528
529 uint32_t version;
530 std::vector<Guid> session_ids;
531 Status status = new_store.Recover(token, token, version, session_ids);
532 ASSERT_EQ(Status::Ok, status);
533 ASSERT_EQ(1, session_ids.size());
534 ASSERT_EQ(session_id, session_ids[0]);
535 ASSERT_EQ(1, new_store.ContinueSession(session_id));
536
537 // Verify the recovered store.
538 static std::atomic<uint64_t> records_read;
539 records_read = 0;
540 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
541 auto callback1 = [](IAsyncContext* ctxt, Status result) {
542 CallbackContext<ReadContext1> context{ ctxt };
543 ASSERT_EQ(Status::Ok, result) << *reinterpret_cast<const uint32_t*>(&context->key());
544 ++records_read;
545 ASSERT_EQ(context->expected, context->val());
546 };
547 auto callback2 = [](IAsyncContext* ctxt, Status result) {
548 CallbackContext<ReadContext2> context{ ctxt };
549 ASSERT_EQ(Status::Ok, result) << *reinterpret_cast<const uint32_t*>(&context->key());
550 ++records_read;
551 ASSERT_EQ(context->expected, context->val());
552 };
553
554 if(idx % 256 == 0) {
555 new_store.Refresh();
556 new_store.CompletePending(false);
557 }
558
559 {
560 ReadContext1 context{ Key{ idx }, idx + 7 };
561 Status result = new_store.Read(context, callback1, 1);
562 if(result == Status::Ok) {
563 ++records_read;
564 ASSERT_EQ(context.expected, context.val());
565 } else {
566 ASSERT_EQ(Status::Pending, result);
567 }
568 }
569 {
570 ReadContext2 context{ Key{ idx + 1 }, 55 };
571 Status result = new_store.Read(context, callback2, 1);
572 if(result == Status::Ok) {
573 ++records_read;
574 ASSERT_EQ(context.expected, context.val());
575 } else {
576 ASSERT_EQ(Status::Pending, result);
577 }
578 }
579 }
580
581 new_store.CompletePending(true);
582 ASSERT_EQ(records_read.load(), kNumRecords);
583 new_store.StopSession();
584
585 session_id = new_store.StartSession();
586
587 // Upsert some changes and verify them.
588 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
589 {
590 UpsertContext1 context{ Key{ idx }, idx + 55 };
591 Status result = new_store.Upsert(context, upsert_callback, 1);
592 ASSERT_EQ(Status::Ok, result);
593 }
594 {
595 UpsertContext2 context{ Key{ idx + 1 }, 77 };
596 Status result = new_store.Upsert(context, upsert_callback, 1);
597 ASSERT_EQ(Status::Ok, result);
598 }
599 }
600 records_read = 0;
601 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
602 auto callback1 = [](IAsyncContext* ctxt, Status result) {
603 CallbackContext<ReadContext1> context{ ctxt };
604 ASSERT_EQ(Status::Ok, result);
605 ++records_read;
606 ASSERT_EQ(context->expected, context->val());
607 };
608 auto callback2 = [](IAsyncContext* ctxt, Status result) {
609 CallbackContext<ReadContext2> context{ ctxt };
610 ASSERT_EQ(Status::Ok, result);
611 ++records_read;
612 ASSERT_EQ(context->expected, context->val());
613 };
614
615 if(idx % 256 == 0) {
616 new_store.Refresh();
617 new_store.CompletePending(false);
618 }
619
620 {
621 ReadContext1 context{ Key{ idx }, idx + 55 };
622 Status result = new_store.Read(context, callback1, 1);
623 if(result == Status::Ok) {
624 ++records_read;
625 ASSERT_EQ(context.expected, context.val());
626 } else {
627 ASSERT_EQ(Status::Pending, result);
628 }
629 }
630 {
631 ReadContext2 context{ Key{ idx + 1 }, 77 };
632 Status result = new_store.Read(context, callback2, 1);
633 if(result == Status::Ok) {
634 ++records_read;
635 ASSERT_EQ(context.expected, context.val());
636 } else {
637 ASSERT_EQ(Status::Pending, result);
638 }
639 }
640 }
641
642 new_store.CompletePending(true);
643 ASSERT_EQ(records_read.load(), kNumRecords);
644 new_store.StopSession();
645}
646
647TEST(CLASS, Serial_VariableLengthKey) {
648 class alignas(4) Key {
649 public:
650 Key(uint8_t len, uint32_t fill)
651 : len_{ len } {
652 for(uint8_t idx = 0; idx < len_; ++idx) {
653 buffer()[idx] = fill;
654 }
655 }
656
657 /// Copy constructor.
658 Key(const Key& other)
659 : len_{ other.len_ } {
660 std::memcpy(buffer(), other.buffer(), len_ * sizeof(uint32_t));
661 }
662
663 inline uint32_t size() const {
664 return sizeof(*this) + (len_ * sizeof(uint32_t));
665 }
666 private:
667 inline uint32_t* buffer() {
668 return reinterpret_cast<uint32_t*>(this + 1);
669 }
670 public:
671 inline const uint32_t* buffer() const {
672 return reinterpret_cast<const uint32_t*>(this + 1);
673 }
674 inline KeyHash GetHash() const {
675 return KeyHash{ Utility::HashBytes(
676 reinterpret_cast<const uint16_t*>(buffer()), len_ * 2) };
677 }
678
679 /// Comparison operators.
680 inline bool operator==(const Key& other) const {
681 return len_ == other.len_ &&
682 std::memcmp(buffer(), other.buffer(), len_ * sizeof(uint32_t)) == 0;
683 }
684 inline bool operator!=(const Key& other) const {
685 return len_ != other.len_ ||
686 std::memcmp(buffer(), other.buffer(), len_ * sizeof(uint32_t)) != 0;
687 }
688
689 private:
690 uint8_t len_;
691
692 };
693 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
694 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
695
696 class UpsertContext1;
697 class UpsertContext2;
698 class ReadContext1;
699 class ReadContext2;
700
701 class Value1 {
702 public:
703 inline uint32_t size() const {
704 return size_;
705 }
706
707 friend class UpsertContext1;
708 friend class UpsertContext2;
709 friend class ReadContext1;
710
711 private:
712 uint16_t size_;
713 union {
714 std::atomic<uint32_t> atomic_val1_;
715 uint32_t val1_;
716 };
717 };
718 static_assert(sizeof(Value1) == 8, "sizeof(Value1) != 8");
719 static_assert(alignof(Value1) == 4, "alignof(Value1) != 4");
720
721 class Value2 : public Value1 {
722 public:
723 friend class UpsertContext2;
724 friend class ReadContext2;
725
726 private:
727 union {
728 std::atomic<uint16_t> atomic_val2_;
729 uint16_t val2_;
730 };
731 uint8_t wasted_space[3];
732 };
733 static_assert(sizeof(Value2) == 16, "sizeof(Value2) != 16");
734 static_assert(alignof(Value2) == 4, "alignof(Value2) != 4");
735
736 class UpsertContext1 : public IAsyncContext {
737 public:
738 typedef Key key_t;
739 typedef Value1 value_t;
740
741 UpsertContext1(uint32_t key, uint32_t val)
742 : val_{ val } {
743 uint8_t len = (key % 16) + 1;
744 key_ = alloc_context<key_t>(sizeof(key_t) + (len * sizeof(uint32_t)));
745 new(key_.get()) key_t{ len, key };
746 }
747
748 /// Deep-copy constructor.
749 UpsertContext1(UpsertContext1& other)
750 : key_{ std::move(other.key_) }
751 , val_{ other.val_ } {
752 }
753
754 /// The implicit and explicit interfaces require a key() accessor.
755 inline const Key& key() const {
756 return *key_.get();
757 }
758 inline static constexpr uint32_t value_size() {
759 return sizeof(value_t);
760 }
761 /// Non-atomic and atomic Put() methods.
762 inline void Put(Value1& value) {
763 value.size_ = sizeof(value);
764 value.val1_ = val_;
765 }
766 inline bool PutAtomic(Value1& value) {
767 EXPECT_EQ(value.size_, sizeof(value));
768 value.atomic_val1_.store(val_);
769 return true;
770 }
771
772 protected:
773 /// The explicit interface requires a DeepCopy_Internal() implementation.
774 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
775 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
776 }
777
778 private:
779 context_unique_ptr_t<key_t> key_;
780 uint32_t val_;
781 };
782
783 class UpsertContext2 : public IAsyncContext {
784 public:
785 typedef Key key_t;
786 typedef Value2 value_t;
787
788 UpsertContext2(uint32_t key, uint16_t val)
789 : val_{ val } {
790 uint8_t len = (key % 16) + 1;
791 key_ = alloc_context<key_t>(sizeof(key_t) + (len * sizeof(uint32_t)));
792 new(key_.get()) key_t{ len, key };
793 }
794
795 /// Deep-copy constructor.
796 UpsertContext2(UpsertContext2& other)
797 : key_{ std::move(other.key_) }
798 , val_{ other.val_ } {
799 }
800
801 /// The implicit and explicit interfaces require a key() accessor.
802 inline const Key& key() const {
803 return *key_.get();
804 }
805 inline static constexpr uint32_t value_size() {
806 return sizeof(value_t);
807 }
808 /// Non-atomic and atomic Put() methods.
809 inline void Put(Value2& value) {
810 value.size_ = sizeof(value);
811 value.val2_ = val_;
812 }
813 inline bool PutAtomic(Value2& value) {
814 EXPECT_EQ(value.size_, sizeof(value));
815 value.atomic_val2_.store(val_);
816 return true;
817 }
818
819 protected:
820 /// The explicit interface requires a DeepCopy_Internal() implementation.
821 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
822 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
823 }
824
825 private:
826 context_unique_ptr_t<key_t> key_;
827 uint16_t val_;
828 };
829
830 class ReadContext1 : public IAsyncContext {
831 public:
832 typedef Key key_t;
833 typedef Value1 value_t;
834
835 ReadContext1(uint32_t key, uint32_t expected_)
836 : val_{ 0 }
837 , expected{ expected_ } {
838 uint8_t len = (key % 16) + 1;
839 key_ = alloc_context<key_t>(sizeof(key_t) + (len * sizeof(uint32_t)));
840 new(key_.get()) key_t{ len, key };
841 }
842
843 /// Deep-copy constructor.
844 ReadContext1(ReadContext1& other)
845 : key_{ std::move(other.key_) }
846 , val_{ other.val_ }
847 , expected{ other.expected } {
848 }
849
850 /// The implicit and explicit interfaces require a key() accessor.
851 inline const Key& key() const {
852 return *key_.get();
853 }
854
855 inline void Get(const Value1& value) {
856 val_ = value.val1_;
857 }
858 inline void GetAtomic(const Value1& value) {
859 val_ = value.atomic_val1_.load();
860 }
861
862 uint64_t val() const {
863 return val_;
864 }
865
866 protected:
867 /// The explicit interface requires a DeepCopy_Internal() implementation.
868 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
869 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
870 }
871
872 private:
873 context_unique_ptr_t<key_t> key_;
874 uint32_t val_;
875 public:
876 const uint32_t expected;
877 };
878
879 class ReadContext2 : public IAsyncContext {
880 public:
881 typedef Key key_t;
882 typedef Value2 value_t;
883
884 ReadContext2(uint32_t key, uint16_t expected_)
885 : val_{ 0 }
886 , expected{ expected_ } {
887 uint8_t len = (key % 16) + 1;
888 key_ = alloc_context<key_t>(sizeof(key_t) + (len * sizeof(uint32_t)));
889 new(key_.get()) key_t{ len, key };
890 }
891
892 /// Deep-copy constructor.
893 ReadContext2(ReadContext2& other)
894 : key_{ std::move(other.key_) }
895 , val_{ other.val_ }
896 , expected{ other.expected } {
897 }
898
899 /// The implicit and explicit interfaces require a key() accessor.
900 inline const Key& key() const {
901 return *key_.get();
902 }
903
904 inline void Get(const Value2& value) {
905 val_ = value.val2_;
906 }
907 inline void GetAtomic(const Value2& value) {
908 val_ = value.atomic_val2_.load();
909 }
910
911 uint64_t val() const {
912 return val_;
913 }
914
915 protected:
916 /// The explicit interface requires a DeepCopy_Internal() implementation.
917 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
918 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
919 }
920
921 private:
922 context_unique_ptr_t<key_t> key_;
923 uint16_t val_;
924 public:
925 const uint16_t expected;
926 };
927
928 auto upsert_callback = [](IAsyncContext* context, Status result) {
929 // Upserts don't go to disk.
930 ASSERT_TRUE(false);
931 };
932
933 std::experimental::filesystem::create_directories("storage");
934
935 static constexpr size_t kNumRecords = 600000;
936
937 Guid session_id;
938 Guid token;
939
940 {
941 // Populate and checkpoint the store.
942 // 6 pages!
943 FasterKv<Key, Value1, disk_t> store{ 524288, 201326592, "storage", 0.4 };
944
945 session_id = store.StartSession();
946
947 // upsert some records
948 assert(kNumRecords % 2 == 0);
949 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
950 {
951 UpsertContext1 context{ idx, idx + 7 };
952 Status result = store.Upsert(context, upsert_callback, 1);
953 ASSERT_EQ(Status::Ok, result);
954 }
955 {
956 UpsertContext2 context{ idx + 1, 55 };
957 Status result = store.Upsert(context, upsert_callback, 1);
958 ASSERT_EQ(Status::Ok, result);
959 }
960 }
961 // verify them
962 static std::atomic<uint64_t> records_read;
963 records_read = 0;
964 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
965 auto callback1 = [](IAsyncContext* ctxt, Status result) {
966 CallbackContext<ReadContext1> context{ ctxt };
967 ASSERT_EQ(Status::Ok, result);
968 ++records_read;
969 ASSERT_EQ(context->expected, context->val());
970 };
971 auto callback2 = [](IAsyncContext* ctxt, Status result) {
972 CallbackContext<ReadContext2> context{ ctxt };
973 ASSERT_EQ(Status::Ok, result);
974 ++records_read;
975 ASSERT_EQ(context->expected, context->val());
976 };
977
978 if(idx % 256 == 0) {
979 store.Refresh();
980 store.CompletePending(false);
981 }
982
983 {
984 ReadContext1 context{ idx, idx + 7 };
985 Status result = store.Read(context, callback1, 1);
986 if(result == Status::Ok) {
987 ++records_read;
988 ASSERT_EQ(context.expected, context.val());
989 } else {
990 ASSERT_EQ(Status::Pending, result);
991 }
992 }
993 {
994 ReadContext2 context{ idx + 1, 55 };
995 Status result = store.Read(context, callback2, 1);
996 if(result == Status::Ok) {
997 ++records_read;
998 ASSERT_EQ(context.expected, context.val());
999 } else {
1000 ASSERT_EQ(Status::Pending, result);
1001 }
1002 }
1003 }
1004
1005 static std::atomic<size_t> num_threads_persistent;
1006 num_threads_persistent = 0;
1007 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
1008 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
1009 threads_persistent[idx] = false;
1010 }
1011
1012 auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
1013 bool expected = false;
1014 ASSERT_EQ(Status::Ok, result);
1015 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected,
1016 true));
1017 ++num_threads_persistent;
1018 };
1019
1020 // checkpoint (transition from REST to INDEX_CHKPT)
1021 ASSERT_TRUE(store.Checkpoint(nullptr, hybrid_log_persistence_callback, token));
1022
1023 while(num_threads_persistent < 1) {
1024 store.CompletePending(false);
1025 }
1026
1027 bool result = store.CompletePending(true);
1028 ASSERT_TRUE(result);
1029 ASSERT_EQ(kNumRecords, records_read.load());
1030
1031 store.StopSession();
1032 }
1033
1034 // Test recovery.
1035 FasterKv<Key, Value1, disk_t> new_store{ 524288, 201326592, "storage", 0.4 };
1036
1037 uint32_t version;
1038 std::vector<Guid> session_ids;
1039 Status status = new_store.Recover(token, token, version, session_ids);
1040 ASSERT_EQ(Status::Ok, status);
1041 ASSERT_EQ(1, session_ids.size());
1042 ASSERT_EQ(session_id, session_ids[0]);
1043 ASSERT_EQ(1, new_store.ContinueSession(session_id));
1044
1045 // Verify the recovered store.
1046 static std::atomic<uint64_t> records_read;
1047 records_read = 0;
1048 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
1049 auto callback1 = [](IAsyncContext* ctxt, Status result) {
1050 CallbackContext<ReadContext1> context{ ctxt };
1051 ASSERT_EQ(Status::Ok, result) << *reinterpret_cast<const uint32_t*>(&context->key());
1052 ++records_read;
1053 ASSERT_EQ(context->expected, context->val());
1054 };
1055 auto callback2 = [](IAsyncContext* ctxt, Status result) {
1056 CallbackContext<ReadContext2> context{ ctxt };
1057 ASSERT_EQ(Status::Ok, result) << *reinterpret_cast<const uint32_t*>(&context->key());
1058 ++records_read;
1059 ASSERT_EQ(context->expected, context->val());
1060 };
1061
1062 if(idx % 256 == 0) {
1063 new_store.Refresh();
1064 new_store.CompletePending(false);
1065 }
1066
1067 {
1068 ReadContext1 context{ idx, idx + 7 };
1069 Status result = new_store.Read(context, callback1, 1);
1070 if(result == Status::Ok) {
1071 ++records_read;
1072 ASSERT_EQ(context.expected, context.val());
1073 } else {
1074 ASSERT_EQ(Status::Pending, result);
1075 }
1076 }
1077 {
1078 ReadContext2 context{ idx + 1, 55 };
1079 Status result = new_store.Read(context, callback2, 1);
1080 if(result == Status::Ok) {
1081 ++records_read;
1082 ASSERT_EQ(context.expected, context.val());
1083 } else {
1084 ASSERT_EQ(Status::Pending, result);
1085 }
1086 }
1087 }
1088
1089 new_store.CompletePending(true);
1090 ASSERT_EQ(records_read.load(), kNumRecords);
1091 new_store.StopSession();
1092
1093 session_id = new_store.StartSession();
1094
1095 // Upsert some changes and verify them.
1096 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
1097 {
1098 UpsertContext1 context{ idx, idx + 55 };
1099 Status result = new_store.Upsert(context, upsert_callback, 1);
1100 ASSERT_EQ(Status::Ok, result);
1101 }
1102 {
1103 UpsertContext2 context{ idx + 1, 77 };
1104 Status result = new_store.Upsert(context, upsert_callback, 1);
1105 ASSERT_EQ(Status::Ok, result);
1106 }
1107 }
1108 records_read = 0;
1109 for(uint32_t idx = 0; idx < kNumRecords; idx += 2) {
1110 auto callback1 = [](IAsyncContext* ctxt, Status result) {
1111 CallbackContext<ReadContext1> context{ ctxt };
1112 ASSERT_EQ(Status::Ok, result);
1113 ++records_read;
1114 ASSERT_EQ(context->expected, context->val());
1115 };
1116 auto callback2 = [](IAsyncContext* ctxt, Status result) {
1117 CallbackContext<ReadContext2> context{ ctxt };
1118 ASSERT_EQ(Status::Ok, result);
1119 ++records_read;
1120 ASSERT_EQ(context->expected, context->val());
1121 };
1122
1123 if(idx % 256 == 0) {
1124 new_store.Refresh();
1125 new_store.CompletePending(false);
1126 }
1127
1128 {
1129 ReadContext1 context{ idx, idx + 55 };
1130 Status result = new_store.Read(context, callback1, 1);
1131 if(result == Status::Ok) {
1132 ++records_read;
1133 ASSERT_EQ(context.expected, context.val());
1134 } else {
1135 ASSERT_EQ(Status::Pending, result);
1136 }
1137 }
1138 {
1139 ReadContext2 context{ idx + 1, 77 };
1140 Status result = new_store.Read(context, callback2, 1);
1141 if(result == Status::Ok) {
1142 ++records_read;
1143 ASSERT_EQ(context.expected, context.val());
1144 } else {
1145 ASSERT_EQ(Status::Pending, result);
1146 }
1147 }
1148 }
1149
1150 new_store.CompletePending(true);
1151 ASSERT_EQ(records_read.load(), kNumRecords);
1152 new_store.StopSession();
1153}
1154
1155TEST(CLASS, Concurrent_Insert_Small) {
1156 class Key {
1157 public:
1158 Key(uint32_t key)
1159 : key_{ key } {
1160 }
1161
1162 inline static constexpr uint32_t size() {
1163 return static_cast<uint32_t>(sizeof(Key));
1164 }
1165 inline KeyHash GetHash() const {
1166 std::hash<uint32_t> hash_fn{};
1167 return KeyHash{ hash_fn(key_) };
1168 }
1169
1170 /// Comparison operators.
1171 inline bool operator==(const Key& other) const {
1172 return key_ == other.key_;
1173 }
1174 inline bool operator!=(const Key& other) const {
1175 return key_ != other.key_;
1176 }
1177
1178 private:
1179 uint32_t key_;
1180 };
1181 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
1182 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
1183
1184 class UpsertContext;
1185 class ReadContext1;
1186 class ReadContext2;
1187
1188 class Value {
1189 public:
1190 Value()
1191 : val_{ 0 } {
1192 }
1193
1194 inline static constexpr uint32_t size() {
1195 return static_cast<uint32_t>(sizeof(Value));
1196 }
1197
1198 friend class UpsertContext;
1199 friend class ReadContext1;
1200 friend class ReadContext2;
1201
1202 private:
1203 union {
1204 std::atomic<uint32_t> atomic_val_;
1205 uint32_t val_;
1206 };
1207 };
1208 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
1209 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
1210
1211 class UpsertContext : public IAsyncContext {
1212 public:
1213 typedef Key key_t;
1214 typedef Value value_t;
1215
1216 UpsertContext(const Key& key, uint32_t val)
1217 : key_{ key }
1218 , val_{ val } {
1219 }
1220
1221 /// Copy (and deep-copy) constructor.
1222 UpsertContext(const UpsertContext& other)
1223 : key_{ other.key_ }
1224 , val_{ other.val_ } {
1225 }
1226
1227 /// The implicit and explicit interfaces require a key() accessor.
1228 inline const Key& key() const {
1229 return key_;
1230 }
1231 inline static constexpr uint32_t value_size() {
1232 return sizeof(value_t);
1233 }
1234 /// Non-atomic and atomic Put() methods.
1235 inline void Put(Value& value) {
1236 value.val_ = val_;
1237 }
1238 inline bool PutAtomic(Value& value) {
1239 value.atomic_val_.store(val_);
1240 return true;
1241 }
1242
1243 protected:
1244 /// The explicit interface requires a DeepCopy_Internal() implementation.
1245 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1246 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1247 }
1248
1249 private:
1250 Key key_;
1251 uint32_t val_;
1252 };
1253
1254 static auto upsert_callback = [](IAsyncContext* context, Status result) {
1255 // Upserts don't go to disk.
1256 ASSERT_TRUE(false);
1257 };
1258
1259 std::experimental::filesystem::create_directories("storage");
1260
1261 static constexpr uint32_t kNumRecords = 500000;
1262 static constexpr uint32_t kNumThreads = 2;
1263 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
1264
1265 static Guid session_ids[kNumThreads];
1266 std::memset(session_ids, 0, sizeof(session_ids));
1267 static Guid token;
1268
1269 static std::atomic<uint32_t> num_threads_persistent;
1270 num_threads_persistent = 0;
1271 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
1272 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
1273 threads_persistent[idx] = false;
1274 }
1275
1276 static std::atomic<uint32_t> num_threads_started;
1277 num_threads_started = 0;
1278
1279 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
1280 bool expected = false;
1281 ASSERT_EQ(Status::Ok, result);
1282 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
1283 ++num_threads_persistent;
1284 };
1285
1286 typedef FasterKv<Key, Value, disk_t> store_t;
1287
1288 class ReadContext1 : public IAsyncContext {
1289 public:
1290 typedef Key key_t;
1291 typedef Value value_t;
1292
1293 ReadContext1(Key key, uint32_t expected_)
1294 : key_{ key }
1295 , val_{ 0 }
1296 , expected{ expected_ } {
1297 }
1298
1299 /// Copy (and deep-copy) constructor.
1300 ReadContext1(const ReadContext1& other)
1301 : key_{ other.key_ }
1302 , val_{ other.val_ }
1303 , expected{ other.expected } {
1304 }
1305
1306 /// The implicit and explicit interfaces require a key() accessor.
1307 inline const Key& key() const {
1308 return key_;
1309 }
1310
1311 inline void Get(const Value& value) {
1312 val_ = value.val_;
1313 }
1314 inline void GetAtomic(const Value& value) {
1315 val_ = value.atomic_val_.load();
1316 }
1317
1318 uint64_t val() const {
1319 return val_;
1320 }
1321
1322 protected:
1323 /// The explicit interface requires a DeepCopy_Internal() implementation.
1324 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1325 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1326 }
1327
1328 private:
1329 Key key_;
1330 uint32_t val_;
1331 public:
1332 const uint32_t expected;
1333 };
1334
1335 {
1336 // Populate and checkpoint the store.
1337
1338 // 6 pages!
1339 store_t store{ 8192, 201326592, "storage", 0.4 };
1340
1341 auto upsert_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
1342 assert(thread_id == 0);
1343 session_ids[thread_id] = store->StartSession();
1344 ++num_threads_started;
1345
1346 // upsert some records
1347 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1348 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1349 UpsertContext context{ Key{ idx }, idx + 7 };
1350
1351 Status result = store->Upsert(context, upsert_callback, 1);
1352 ASSERT_EQ(Status::Ok, result);
1353
1354 if(idx % 256 == 0) {
1355 store->Refresh();
1356 }
1357 }
1358
1359 while(num_threads_started < kNumThreads) {
1360 std::this_thread::yield();
1361 }
1362 // checkpoint (transition from REST to INDEX_CHKPT)
1363 ASSERT_TRUE(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token));
1364
1365 // Ensure that the checkpoint completes.
1366 while(num_threads_persistent < kNumThreads) {
1367 store->CompletePending(false);
1368 }
1369
1370 bool result = store->CompletePending(true);
1371 ASSERT_TRUE(result);
1372 store->StopSession();
1373 };
1374
1375 auto upsert_worker = [](store_t* store, uint32_t thread_id) {
1376 assert(thread_id != 0);
1377 session_ids[thread_id] = store->StartSession();
1378 ++num_threads_started;
1379
1380 // upsert some records
1381 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1382 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1383 UpsertContext context{ Key{ idx }, idx + 7 };
1384 Status result = store->Upsert(context, upsert_callback, 1);
1385 ASSERT_EQ(Status::Ok, result);
1386
1387 if(idx % 256 == 0) {
1388 store->Refresh();
1389 }
1390 }
1391
1392 // Don't exit this session until the checkpoint has completed.
1393 while(num_threads_persistent < kNumThreads) {
1394 store->CompletePending(false);
1395 }
1396
1397 bool result = store->CompletePending(true);
1398 ASSERT_TRUE(result);
1399 store->StopSession();
1400 };
1401
1402 std::deque<std::thread> threads{};
1403 threads.emplace_back(upsert_checkpoint_worker, &store, 0);
1404 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
1405 threads.emplace_back(upsert_worker, &store, idx);
1406 }
1407 for(auto& thread : threads) {
1408 thread.join();
1409 }
1410
1411 // Verify the store.
1412 store.StartSession();
1413
1414 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
1415 auto callback = [](IAsyncContext* ctxt, Status result) {
1416 CallbackContext<ReadContext1> context{ ctxt };
1417 ASSERT_EQ(Status::Ok, result);
1418 ASSERT_EQ(context->expected, context->val());
1419 };
1420
1421 ReadContext1 context{ Key{ idx }, idx + 7 };
1422 Status result = store.Read(context, callback, 1);
1423 if(result != Status::Ok) {
1424 ASSERT_EQ(Status::Pending, result);
1425 }
1426 }
1427
1428 store.StopSession();
1429 }
1430
1431 // Test recovery.
1432 store_t new_store{ 8192, 201326592, "storage", 0.4 };
1433
1434 uint32_t version;
1435 std::vector<Guid> recovered_session_ids;
1436 Status status = new_store.Recover(token, token, version, recovered_session_ids);
1437 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
1438 ASSERT_EQ(Status::Ok, status);
1439
1440 static std::atomic<uint32_t> records_read;
1441 records_read = 0;
1442
1443 class ReadContext2 : public IAsyncContext {
1444 public:
1445 typedef Key key_t;
1446 typedef Value value_t;
1447
1448 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
1449 : key_{ key }
1450 , val_{ 0 }
1451 , expected{ expected_ }
1452 , idx{ idx_ }
1453 , found{ found_ } {
1454 }
1455
1456 /// Copy (and deep-copy) constructor.
1457 ReadContext2(const ReadContext2& other)
1458 : key_{ other.key_ }
1459 , val_{ other.val_ }
1460 , expected{ other.expected }
1461 , idx{ other.idx }
1462 , found{ other.found } {
1463 }
1464
1465 /// The implicit and explicit interfaces require a key() accessor.
1466 inline const Key& key() const {
1467 return key_;
1468 }
1469
1470 inline void Get(const Value& value) {
1471 val_ = value.val_;
1472 }
1473 inline void GetAtomic(const Value& value) {
1474 val_ = value.atomic_val_.load();
1475 }
1476
1477 uint64_t val() const {
1478 return val_;
1479 }
1480
1481 protected:
1482 /// The explicit interface requires a DeepCopy_Internal() implementation.
1483 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1484 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1485 }
1486
1487 private:
1488 Key key_;
1489 uint32_t val_;
1490 public:
1491 const uint32_t expected;
1492 const uint32_t idx;
1493 std::atomic<bool>* found;
1494 };
1495
1496 auto read_worker = [](store_t* store, uint32_t thread_id) {
1497 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
1498 ASSERT_EQ(1, serial_num);
1499
1500 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
1501 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
1502
1503 // verify records
1504 auto callback = [](IAsyncContext* ctxt, Status result) {
1505 CallbackContext<ReadContext2> context{ ctxt };
1506 if(result == Status::Ok) {
1507 ++records_read;
1508 ASSERT_EQ(context->expected, context->val());
1509 bool expected = false;
1510 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
1511 } else {
1512 ASSERT_EQ(Status::NotFound, result);
1513 ASSERT_FALSE(context->found[context->idx].load());
1514 }
1515 };
1516 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1517 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1518 ReadContext2 context{ Key{ idx }, idx + 7, idx - (kNumRecordsPerThread * thread_id),
1519 found.get() };
1520 Status result = store->Read(context, callback, 1);
1521 if(result == Status::Ok) {
1522 ++records_read;
1523 ASSERT_EQ(context.expected, context.val());
1524 bool expected = false;
1525 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
1526 } else {
1527 ASSERT_TRUE(result == Status::Pending || result == Status::NotFound);
1528 if(result == Status::NotFound) {
1529 ASSERT_FALSE(found.get()[context.idx].load());
1530 }
1531 }
1532
1533 if(idx % 256 == 0) {
1534 store->Refresh();
1535 store->CompletePending(false);
1536 }
1537 }
1538 store->CompletePending(true);
1539 store->StopSession();
1540
1541 bool found_all = true;
1542 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
1543 if(found_all != found.get()[idx]) {
1544 // Consistent-point recovery implies that after one record isn't found, all subsequent
1545 // records will not be found.
1546 Key key{ kNumRecordsPerThread* thread_id + idx };
1547 KeyHash hash = key.GetHash();
1548 std::string error;
1549 error += "key = ";
1550 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
1551 error += ", idx = ";
1552 error += std::to_string(hash.idx(8192));
1553 error += ", tag = ";
1554 error += std::to_string(hash.tag());
1555 ASSERT_TRUE(found_all) << error;
1556 found_all = false;
1557 }
1558 }
1559 };
1560
1561 std::deque<std::thread> threads{};
1562 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
1563 threads.emplace_back(read_worker, &new_store, idx);
1564 }
1565 for(auto& thread : threads) {
1566 thread.join();
1567 }
1568
1569 ASSERT_GT(records_read, (uint32_t)0);
1570 ASSERT_LE(records_read, kNumRecords);
1571}
1572
1573TEST(CLASS, Concurrent_Insert_Large) {
1574 class Key {
1575 public:
1576 Key(uint32_t key)
1577 : key_{ key } {
1578 }
1579
1580 inline static constexpr uint32_t size() {
1581 return static_cast<uint32_t>(sizeof(Key));
1582 }
1583 inline KeyHash GetHash() const {
1584 std::hash<uint32_t> hash_fn{};
1585 return KeyHash{ hash_fn(key_) };
1586 }
1587
1588 /// Comparison operators.
1589 inline bool operator==(const Key& other) const {
1590 return key_ == other.key_;
1591 }
1592 inline bool operator!=(const Key& other) const {
1593 return key_ != other.key_;
1594 }
1595
1596 private:
1597 uint32_t key_;
1598 };
1599 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
1600 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
1601
1602 class UpsertContext;
1603 class ReadContext1;
1604 class ReadContext2;
1605
1606 class Value {
1607 public:
1608 Value()
1609 : val_{ 0 } {
1610 }
1611
1612 inline static constexpr uint32_t size() {
1613 return static_cast<uint32_t>(sizeof(Value));
1614 }
1615
1616 friend class UpsertContext;
1617 friend class ReadContext1;
1618 friend class ReadContext2;
1619
1620 private:
1621 union {
1622 std::atomic<uint32_t> atomic_val_;
1623 uint32_t val_;
1624 };
1625 };
1626 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
1627 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
1628
1629 class UpsertContext : public IAsyncContext {
1630 public:
1631 typedef Key key_t;
1632 typedef Value value_t;
1633
1634 UpsertContext(const Key& key, uint32_t val)
1635 : key_{ key }
1636 , val_{ val } {
1637 }
1638
1639 /// Copy (and deep-copy) constructor.
1640 UpsertContext(const UpsertContext& other)
1641 : key_{ other.key_ }
1642 , val_{ other.val_ } {
1643 }
1644
1645 /// The implicit and explicit interfaces require a key() accessor.
1646 inline const Key& key() const {
1647 return key_;
1648 }
1649 inline static constexpr uint32_t value_size() {
1650 return sizeof(value_t);
1651 }
1652 /// Non-atomic and atomic Put() methods.
1653 inline void Put(Value& value) {
1654 value.val_ = val_;
1655 }
1656 inline bool PutAtomic(Value& value) {
1657 value.atomic_val_.store(val_);
1658 return true;
1659 }
1660
1661 protected:
1662 /// The explicit interface requires a DeepCopy_Internal() implementation.
1663 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1664 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1665 }
1666
1667 private:
1668 Key key_;
1669 uint32_t val_;
1670 };
1671
1672 static auto upsert_callback = [](IAsyncContext* context, Status result) {
1673 // Upserts don't go to disk.
1674 ASSERT_TRUE(false);
1675 };
1676
1677 std::experimental::filesystem::create_directories("storage");
1678
1679 static constexpr uint32_t kNumRecords = 1000000;
1680 static constexpr uint32_t kNumThreads = 2;
1681 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
1682
1683 static Guid session_ids[kNumThreads];
1684 std::memset(session_ids, 0, sizeof(session_ids));
1685 static Guid token;
1686
1687 static std::atomic<uint32_t> num_threads_persistent;
1688 num_threads_persistent = 0;
1689 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
1690 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
1691 threads_persistent[idx] = false;
1692 }
1693
1694 static std::atomic<uint32_t> num_threads_started;
1695 num_threads_started = 0;
1696
1697 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
1698 bool expected = false;
1699 ASSERT_EQ(Status::Ok, result);
1700 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
1701 ++num_threads_persistent;
1702 };
1703
1704 typedef FasterKv<Key, Value, disk_t> store_t;
1705
1706 class ReadContext1 : public IAsyncContext {
1707 public:
1708 typedef Key key_t;
1709 typedef Value value_t;
1710
1711 ReadContext1(Key key, uint32_t expected_)
1712 : key_{ key }
1713 , val_{ 0 }
1714 , expected{ expected_ } {
1715 }
1716
1717 /// Copy (and deep-copy) constructor.
1718 ReadContext1(const ReadContext1& other)
1719 : key_{ other.key_ }
1720 , val_{ other.val_ }
1721 , expected{ other.expected } {
1722 }
1723
1724 /// The implicit and explicit interfaces require a key() accessor.
1725 inline const Key& key() const {
1726 return key_;
1727 }
1728
1729 inline void Get(const Value& value) {
1730 val_ = value.val_;
1731 }
1732 inline void GetAtomic(const Value& value) {
1733 val_ = value.atomic_val_.load();
1734 }
1735
1736 uint64_t val() const {
1737 return val_;
1738 }
1739
1740 protected:
1741 /// The explicit interface requires a DeepCopy_Internal() implementation.
1742 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1743 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1744 }
1745
1746 private:
1747 Key key_;
1748 uint32_t val_;
1749 public:
1750 const uint32_t expected;
1751 };
1752
1753 {
1754 // Populate and checkpoint the store.
1755
1756 // 6 pages!
1757 store_t store{ 524288, 201326592, "storage", 0.4 };
1758
1759 auto upsert_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
1760 assert(thread_id == 0);
1761 session_ids[thread_id] = store->StartSession();
1762 ++num_threads_started;
1763
1764 // upsert some records
1765 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1766 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1767 UpsertContext context{ Key{ idx }, idx + 7 };
1768
1769 Status result = store->Upsert(context, upsert_callback, 1);
1770 ASSERT_EQ(Status::Ok, result);
1771
1772 if(idx % 256 == 0) {
1773 store->Refresh();
1774 }
1775 }
1776
1777 while(num_threads_started < kNumThreads) {
1778 std::this_thread::yield();
1779 }
1780 // checkpoint (transition from REST to INDEX_CHKPT)
1781 ASSERT_TRUE(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token));
1782
1783 // Ensure that the checkpoint completes.
1784 while(num_threads_persistent < kNumThreads) {
1785 store->CompletePending(false);
1786 }
1787
1788 bool result = store->CompletePending(true);
1789 ASSERT_TRUE(result);
1790 store->StopSession();
1791 };
1792
1793 auto upsert_worker = [](store_t* store, uint32_t thread_id) {
1794 assert(thread_id != 0);
1795 session_ids[thread_id] = store->StartSession();
1796 ++num_threads_started;
1797
1798 // upsert some records
1799 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1800 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1801 UpsertContext context{ Key{ idx }, idx + 7 };
1802 Status result = store->Upsert(context, upsert_callback, 1);
1803 ASSERT_EQ(Status::Ok, result);
1804
1805 if(idx % 256 == 0) {
1806 store->Refresh();
1807 }
1808 }
1809
1810 // Don't exit this session until the checkpoint has completed.
1811 while(num_threads_persistent < kNumThreads) {
1812 store->CompletePending(false);
1813 }
1814
1815 bool result = store->CompletePending(true);
1816 ASSERT_TRUE(result);
1817 store->StopSession();
1818 };
1819
1820 std::deque<std::thread> threads{};
1821 threads.emplace_back(upsert_checkpoint_worker, &store, 0);
1822 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
1823 threads.emplace_back(upsert_worker, &store, idx);
1824 }
1825 for(auto& thread : threads) {
1826 thread.join();
1827 }
1828
1829 // Verify the store.
1830 store.StartSession();
1831 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
1832 auto callback = [](IAsyncContext* ctxt, Status result) {
1833 CallbackContext<ReadContext1> context{ ctxt };
1834 ASSERT_EQ(Status::Ok, result);
1835 ASSERT_EQ(context->expected, context->val());
1836 };
1837
1838 ReadContext1 context{ Key{ idx }, idx + 7 };
1839 Status result = store.Read(context, callback, 1);
1840 if(result != Status::Ok) {
1841 ASSERT_EQ(Status::Pending, result);
1842 }
1843 }
1844 store.StopSession();
1845 }
1846
1847 // Test recovery.
1848 store_t new_store{ 524288, 201326592, "storage", 0.4 };
1849
1850 uint32_t version;
1851 std::vector<Guid> recovered_session_ids;
1852 Status status = new_store.Recover(token, token, version, recovered_session_ids);
1853 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
1854 ASSERT_EQ(Status::Ok, status);
1855
1856 static std::atomic<uint32_t> records_read;
1857 records_read = 0;
1858
1859 class ReadContext2 : public IAsyncContext {
1860 public:
1861 typedef Key key_t;
1862 typedef Value value_t;
1863
1864 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
1865 : key_{ key }
1866 , val_{ 0 }
1867 , expected{ expected_ }
1868 , idx{ idx_ }
1869 , found{ found_ } {
1870 }
1871
1872 /// Copy (and deep-copy) constructor.
1873 ReadContext2(const ReadContext2& other)
1874 : key_{ other.key_ }
1875 , val_{ other.val_ }
1876 , expected{ other.expected }
1877 , idx{ other.idx }
1878 , found{ other.found } {
1879 }
1880
1881 /// The implicit and explicit interfaces require a key() accessor.
1882 inline const Key& key() const {
1883 return key_;
1884 }
1885
1886 inline void Get(const Value& value) {
1887 val_ = value.val_;
1888 }
1889 inline void GetAtomic(const Value& value) {
1890 val_ = value.atomic_val_.load();
1891 }
1892
1893 uint64_t val() const {
1894 return val_;
1895 }
1896
1897 protected:
1898 /// The explicit interface requires a DeepCopy_Internal() implementation.
1899 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1900 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1901 }
1902
1903 private:
1904 Key key_;
1905 uint32_t val_;
1906 public:
1907 const uint32_t expected;
1908 const uint32_t idx;
1909 std::atomic<bool>* found;
1910 };
1911
1912 auto read_worker = [](store_t* store, uint32_t thread_id) {
1913 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
1914 ASSERT_EQ(1, serial_num);
1915
1916 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
1917 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
1918
1919 // verify records
1920 auto callback = [](IAsyncContext* ctxt, Status result) {
1921 CallbackContext<ReadContext2> context{ ctxt };
1922 if(result == Status::Ok) {
1923 ++records_read;
1924 ASSERT_EQ(context->expected, context->val());
1925 bool expected = false;
1926 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
1927 } else {
1928 ASSERT_EQ(Status::NotFound, result);
1929 ASSERT_FALSE(context->found[context->idx].load());
1930 }
1931 };
1932 for(uint32_t idx = kNumRecordsPerThread * thread_id;
1933 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
1934 ReadContext2 context{ Key{ idx }, idx + 7, idx - (kNumRecordsPerThread * thread_id),
1935 found.get() };
1936 Status result = store->Read(context, callback, 1);
1937 if(result == Status::Ok) {
1938 ++records_read;
1939 ASSERT_EQ(context.expected, context.val());
1940 bool expected = false;
1941 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
1942 } else {
1943 ASSERT_TRUE(result == Status::Pending || result == Status::NotFound);
1944 if(result == Status::NotFound) {
1945 ASSERT_FALSE(found.get()[context.idx].load());
1946 }
1947 }
1948
1949 if(idx % 256 == 0) {
1950 store->Refresh();
1951 store->CompletePending(false);
1952 }
1953 }
1954 store->CompletePending(true);
1955 store->StopSession();
1956
1957 bool found_all = true;
1958 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
1959 if(found_all != found.get()[idx]) {
1960 // Consistent-point recovery implies that after one record isn't found, all subsequent
1961 // records will not be found.
1962 Key key{ kNumRecordsPerThread* thread_id + idx };
1963 KeyHash hash = key.GetHash();
1964 std::string error;
1965 error += "key = ";
1966 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
1967 error += ", idx = ";
1968 error += std::to_string(hash.idx(8192));
1969 error += ", tag = ";
1970 error += std::to_string(hash.tag());
1971 ASSERT_TRUE(found_all) << error;
1972 found_all = false;
1973 }
1974 }
1975 };
1976
1977 std::deque<std::thread> threads{};
1978 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
1979 threads.emplace_back(read_worker, &new_store, idx);
1980 }
1981 for(auto& thread : threads) {
1982 thread.join();
1983 }
1984
1985 ASSERT_GT(records_read, (uint32_t)0);
1986 ASSERT_LE(records_read, kNumRecords);
1987}
1988
1989TEST(CLASS, Concurrent_Update_Small) {
1990 class Key {
1991 public:
1992 Key(uint32_t key)
1993 : key_{ key } {
1994 }
1995 inline static constexpr uint32_t size() {
1996 return static_cast<uint32_t>(sizeof(Key));
1997 }
1998 inline KeyHash GetHash() const {
1999 std::hash<uint32_t> hash_fn{};
2000 return KeyHash{ hash_fn(key_) };
2001 }
2002
2003 /// Comparison operators.
2004 inline bool operator==(const Key& other) const {
2005 return key_ == other.key_;
2006 }
2007 inline bool operator!=(const Key& other) const {
2008 return key_ != other.key_;
2009 }
2010
2011 private:
2012 uint32_t key_;
2013 };
2014 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
2015 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
2016
2017 class UpsertContext;
2018 class ReadContext1;
2019 class ReadContext2;
2020
2021 class Value {
2022 public:
2023 Value()
2024 : val_{ 0 } {
2025 }
2026
2027 inline static constexpr uint32_t size() {
2028 return static_cast<uint32_t>(sizeof(Value));
2029 }
2030
2031 friend class UpsertContext;
2032 friend class ReadContext1;
2033 friend class ReadContext2;
2034
2035 private:
2036 union {
2037 std::atomic<uint32_t> atomic_val_;
2038 uint32_t val_;
2039 };
2040 };
2041 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
2042 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
2043
2044 class UpsertContext : public IAsyncContext {
2045 public:
2046 typedef Key key_t;
2047 typedef Value value_t;
2048
2049 UpsertContext(const Key& key, uint32_t val)
2050 : key_{ key }
2051 , val_{ val } {
2052 }
2053
2054 /// Copy (and deep-copy) constructor.
2055 UpsertContext(const UpsertContext& other)
2056 : key_{ other.key_ }
2057 , val_{ other.val_ } {
2058 }
2059
2060 /// The implicit and explicit interfaces require a key() accessor.
2061 inline const Key& key() const {
2062 return key_;
2063 }
2064 inline static constexpr uint32_t value_size() {
2065 return sizeof(value_t);
2066 }
2067 /// Non-atomic and atomic Put() methods.
2068 inline void Put(Value& value) {
2069 value.val_ = val_;
2070 }
2071 inline bool PutAtomic(Value& value) {
2072 value.atomic_val_.store(val_);
2073 return true;
2074 }
2075
2076 protected:
2077 /// The explicit interface requires a DeepCopy_Internal() implementation.
2078 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2079 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2080 }
2081
2082 private:
2083 Key key_;
2084 uint32_t val_;
2085 };
2086
2087 static auto upsert_callback = [](IAsyncContext* context, Status result) {
2088 // Upserts don't go to disk.
2089 ASSERT_TRUE(false);
2090 };
2091
2092 std::experimental::filesystem::create_directories("storage");
2093
2094 static constexpr uint32_t kNumRecords = 200000;
2095 static constexpr uint32_t kNumThreads = 2;
2096 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
2097
2098 static Guid session_ids[kNumThreads];
2099 std::memset(session_ids, 0, sizeof(session_ids));
2100 static Guid token;
2101
2102 static std::atomic<uint32_t> num_threads_persistent;
2103 num_threads_persistent = 0;
2104 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
2105 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
2106 threads_persistent[idx] = false;
2107 }
2108
2109 static std::atomic<uint32_t> num_threads_started;
2110 num_threads_started = 0;
2111
2112 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
2113 bool expected = false;
2114 ASSERT_EQ(Status::Ok, result);
2115 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
2116 ++num_threads_persistent;
2117 };
2118
2119 typedef FasterKv<Key, Value, disk_t> store_t;
2120
2121 class ReadContext1 : public IAsyncContext {
2122 public:
2123 typedef Key key_t;
2124 typedef Value value_t;
2125
2126 ReadContext1(Key key, uint32_t expected_)
2127 : key_{ key }
2128 , val_{ 0 }
2129 , expected{ expected_ } {
2130 }
2131
2132 /// Copy (and deep-copy) constructor.
2133 ReadContext1(const ReadContext1& other)
2134 : key_{ other.key_ }
2135 , val_{ other.val_ }
2136 , expected{ other.expected } {
2137 }
2138
2139 /// The implicit and explicit interfaces require a key() accessor.
2140 inline const Key& key() const {
2141 return key_;
2142 }
2143
2144 inline void Get(const Value& value) {
2145 val_ = value.val_;
2146 }
2147 inline void GetAtomic(const Value& value) {
2148 val_ = value.atomic_val_.load();
2149 }
2150
2151 uint64_t val() const {
2152 return val_;
2153 }
2154
2155 protected:
2156 /// The explicit interface requires a DeepCopy_Internal() implementation.
2157 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2158 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2159 }
2160
2161 private:
2162 Key key_;
2163 uint32_t val_;
2164 public:
2165 const uint32_t expected;
2166 };
2167
2168 {
2169 // 6 pages!
2170 store_t store{ 8192, 201326592, "storage", 0.4 };
2171
2172 // Populate the store.
2173 store.StartSession();
2174 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
2175 UpsertContext context{ Key{ idx }, 999 };
2176 Status result = store.Upsert(context, upsert_callback, 1);
2177 ASSERT_EQ(Status::Ok, result);
2178 if(idx % 256 == 0) {
2179 store.Refresh();
2180 store.CompletePending(false);
2181 }
2182 }
2183 store.StopSession();
2184
2185 /// Update and checkpoint the store.
2186 auto upsert_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
2187 assert(thread_id == 0);
2188 session_ids[thread_id] = store->StartSession();
2189 ++num_threads_started;
2190
2191 // update some records
2192 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2193 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2194 UpsertContext context{ Key{ idx }, idx + 1 };
2195
2196 Status result = store->Upsert(context, upsert_callback, idx + 1);
2197 ASSERT_EQ(Status::Ok, result);
2198
2199 if(idx % 256 == 0) {
2200 store->Refresh();
2201 }
2202 }
2203
2204 while(num_threads_started < kNumThreads) {
2205 std::this_thread::yield();
2206 }
2207 // checkpoint (transition from REST to INDEX_CHKPT)
2208 ASSERT_TRUE(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token));
2209
2210 // Ensure that the checkpoint completes.
2211 while(num_threads_persistent < kNumThreads) {
2212 store->CompletePending(false);
2213 }
2214
2215 bool result = store->CompletePending(true);
2216 ASSERT_TRUE(result);
2217 store->StopSession();
2218 };
2219
2220 auto upsert_worker = [](store_t* store, uint32_t thread_id) {
2221 assert(thread_id != 0);
2222 session_ids[thread_id] = store->StartSession();
2223 ++num_threads_started;
2224
2225 // update some records
2226 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2227 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2228 UpsertContext context{ Key{ idx }, idx + 1 };
2229 Status result = store->Upsert(context, upsert_callback, idx + 1);
2230 ASSERT_EQ(Status::Ok, result);
2231
2232 if(idx % 256 == 0) {
2233 store->Refresh();
2234 }
2235 }
2236
2237 // Don't exit this session until the checkpoint has completed.
2238 while(num_threads_persistent < kNumThreads) {
2239 store->CompletePending(false);
2240 }
2241
2242 bool result = store->CompletePending(true);
2243 ASSERT_TRUE(result);
2244 store->StopSession();
2245 };
2246
2247 std::deque<std::thread> threads{};
2248 threads.emplace_back(upsert_checkpoint_worker, &store, 0);
2249 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
2250 threads.emplace_back(upsert_worker, &store, idx);
2251 }
2252 for(auto& thread : threads) {
2253 thread.join();
2254 }
2255
2256 // Verify the store.
2257 store.StartSession();
2258 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
2259 auto callback = [](IAsyncContext* ctxt, Status result) {
2260 CallbackContext<ReadContext1> context{ ctxt };
2261 ASSERT_EQ(Status::Ok, result);
2262 ASSERT_EQ(context->expected, context->val());
2263 };
2264
2265 ReadContext1 context{ Key{ idx }, idx + 1 };
2266 Status result = store.Read(context, callback, 1);
2267 if(result != Status::Ok) {
2268 ASSERT_EQ(Status::Pending, result);
2269 }
2270 }
2271 store.StopSession();
2272 }
2273
2274 // Test recovery.
2275 store_t new_store{ 8192, 201326592, "storage", 0.4 };
2276
2277 uint32_t version;
2278 std::vector<Guid> recovered_session_ids;
2279 Status status = new_store.Recover(token, token, version, recovered_session_ids);
2280 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
2281 ASSERT_EQ(Status::Ok, status);
2282
2283 static std::atomic<uint32_t> records_read;
2284 records_read = 0;
2285
2286 class ReadContext2 : public IAsyncContext {
2287 public:
2288 typedef Key key_t;
2289 typedef Value value_t;
2290
2291 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
2292 : key_{ key }
2293 , val_{ 0 }
2294 , expected{ expected_ }
2295 , idx{ idx_ }
2296 , found{ found_ } {
2297 }
2298
2299 /// Copy (and deep-copy) constructor.
2300 ReadContext2(const ReadContext2& other)
2301 : key_{ other.key_ }
2302 , val_{ other.val_ }
2303 , expected{ other.expected }
2304 , idx{ other.idx }
2305 , found{ other.found } {
2306 }
2307
2308 /// The implicit and explicit interfaces require a key() accessor.
2309 inline const Key& key() const {
2310 return key_;
2311 }
2312
2313 inline void Get(const Value& value) {
2314 val_ = value.val_;
2315 }
2316 inline void GetAtomic(const Value& value) {
2317 val_ = value.atomic_val_.load();
2318 }
2319
2320 uint64_t val() const {
2321 return val_;
2322 }
2323
2324 protected:
2325 /// The explicit interface requires a DeepCopy_Internal() implementation.
2326 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2327 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2328 }
2329
2330 private:
2331 Key key_;
2332 uint32_t val_;
2333 public:
2334 const uint32_t expected;
2335 const uint32_t idx;
2336 std::atomic<bool>* found;
2337 };
2338
2339 auto read_worker = [](store_t* store, uint32_t thread_id) {
2340 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
2341 ASSERT_GE(serial_num, 1);
2342
2343 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
2344 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
2345
2346 // verify records
2347 auto callback = [](IAsyncContext* ctxt, Status result) {
2348 CallbackContext<ReadContext2> context{ ctxt };
2349 ASSERT_EQ(Status::Ok, result);
2350 if(context->expected == context->val()) {
2351 bool expected = false;
2352 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
2353 } else {
2354 ASSERT_EQ(999, context->val());
2355 bool expected = false;
2356 ASSERT_FALSE(context->found[context->idx].load());
2357 }
2358 };
2359 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2360 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2361 ReadContext2 context{ Key{ idx }, idx + 1, idx - (kNumRecordsPerThread * thread_id),
2362 found.get() };
2363 Status result = store->Read(context, callback, 1);
2364 if(result == Status::Ok) {
2365 ++records_read;
2366 if(context.expected == context.val()) {
2367 bool expected = false;
2368 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
2369 } else {
2370 ASSERT_EQ(999, context.val());
2371 bool expected = false;
2372 ASSERT_FALSE(found.get()[context.idx].load());
2373 }
2374 } else {
2375 ASSERT_EQ(Status::Pending, result);
2376 }
2377 if(idx % 256 == 0) {
2378 store->Refresh();
2379 store->CompletePending(false);
2380 }
2381 }
2382 store->CompletePending(true);
2383 store->StopSession();
2384
2385 bool found_all = true;
2386 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
2387 if(found_all != found.get()[idx]) {
2388 // Consistent-point recovery implies that after one record isn't found, all subsequent
2389 // records will not be found.
2390 Key key{ kNumRecordsPerThread* thread_id + idx };
2391 KeyHash hash = key.GetHash();
2392 std::string error;
2393 error += "key = ";
2394 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
2395 error += ", idx = ";
2396 error += std::to_string(hash.idx(8192));
2397 error += ", tag = ";
2398 error += std::to_string(hash.tag());
2399 ASSERT_TRUE(found_all) << error;
2400 found_all = false;
2401 }
2402 }
2403 };
2404
2405 std::deque<std::thread> threads{};
2406 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
2407 threads.emplace_back(read_worker, &new_store, idx);
2408 }
2409 for(auto& thread : threads) {
2410 thread.join();
2411 }
2412
2413 ASSERT_GT(records_read, (uint32_t)0);
2414 ASSERT_LE(records_read, kNumRecords);
2415}
2416
2417TEST(CLASS, Concurrent_Update_Large) {
2418 class Key {
2419 public:
2420 Key(uint32_t key)
2421 : key_{ key } {
2422 }
2423
2424 inline static constexpr uint32_t size() {
2425 return static_cast<uint32_t>(sizeof(Key));
2426 }
2427 inline KeyHash GetHash() const {
2428 std::hash<uint32_t> hash_fn{};
2429 return KeyHash{ hash_fn(key_) };
2430 }
2431
2432 /// Comparison operators.
2433 inline bool operator==(const Key& other) const {
2434 return key_ == other.key_;
2435 }
2436 inline bool operator!=(const Key& other) const {
2437 return key_ != other.key_;
2438 }
2439
2440 private:
2441 uint32_t key_;
2442 };
2443 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
2444 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
2445
2446 class UpsertContext;
2447 class ReadContext1;
2448 class ReadContext2;
2449
2450 class Value {
2451 public:
2452 Value()
2453 : val_{ 0 } {
2454 }
2455
2456 inline static constexpr uint32_t size() {
2457 return static_cast<uint32_t>(sizeof(Value));
2458 }
2459
2460 friend class UpsertContext;
2461 friend class ReadContext1;
2462 friend class ReadContext2;
2463
2464 private:
2465 union {
2466 std::atomic<uint32_t> atomic_val_;
2467 uint32_t val_;
2468 };
2469 };
2470 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
2471 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
2472
2473 class UpsertContext : public IAsyncContext {
2474 public:
2475 typedef Key key_t;
2476 typedef Value value_t;
2477
2478 UpsertContext(const Key& key, uint32_t val)
2479 : key_{ key }
2480 , val_{ val } {
2481 }
2482
2483 /// Copy (and deep-copy) constructor.
2484 UpsertContext(const UpsertContext& other)
2485 : key_{ other.key_ }
2486 , val_{ other.val_ } {
2487 }
2488
2489 /// The implicit and explicit interfaces require a key() accessor.
2490 inline const Key& key() const {
2491 return key_;
2492 }
2493 inline static constexpr uint32_t value_size() {
2494 return sizeof(value_t);
2495 }
2496 /// Non-atomic and atomic Put() methods.
2497 inline void Put(Value& value) {
2498 value.val_ = val_;
2499 }
2500 inline bool PutAtomic(Value& value) {
2501 value.atomic_val_.store(val_);
2502 return true;
2503 }
2504
2505 protected:
2506 /// The explicit interface requires a DeepCopy_Internal() implementation.
2507 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2508 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2509 }
2510
2511 private:
2512 Key key_;
2513 uint32_t val_;
2514 };
2515
2516 static auto upsert_callback = [](IAsyncContext* context, Status result) {
2517 // Upserts don't go to disk.
2518 ASSERT_TRUE(false);
2519 };
2520
2521 std::experimental::filesystem::create_directories("storage");
2522
2523 static constexpr uint32_t kNumRecords = 1000000;
2524 static constexpr uint32_t kNumThreads = 2;
2525 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
2526
2527 static Guid session_ids[kNumThreads];
2528 std::memset(session_ids, 0, sizeof(session_ids));
2529 static Guid index_token;
2530 static Guid hybrid_log_token;
2531
2532 static std::atomic<bool> index_checkpoint_completed;
2533 index_checkpoint_completed = false;
2534 static std::atomic<uint32_t> num_threads_persistent;
2535 num_threads_persistent = 0;
2536 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
2537 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
2538 threads_persistent[idx] = false;
2539 }
2540
2541 static std::atomic<uint32_t> num_threads_started;
2542 num_threads_started = 0;
2543
2544 static auto index_persistence_callback = [](Status result) {
2545 ASSERT_EQ(Status::Ok, result);
2546 index_checkpoint_completed = true;
2547 };
2548
2549 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
2550 bool expected = false;
2551 ASSERT_EQ(Status::Ok, result);
2552 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
2553 ++num_threads_persistent;
2554 };
2555
2556 typedef FasterKv<Key, Value, disk_t> store_t;
2557
2558 class ReadContext1 : public IAsyncContext {
2559 public:
2560 typedef Key key_t;
2561 typedef Value value_t;
2562
2563 ReadContext1(Key key, uint32_t expected_)
2564 : key_{ key }
2565 , val_{ 0 }
2566 , expected{ expected_ } {
2567 }
2568
2569 /// Copy (and deep-copy) constructor.
2570 ReadContext1(const ReadContext1& other)
2571 : key_{ other.key_ }
2572 , val_{ other.val_ }
2573 , expected{ other.expected } {
2574 }
2575
2576 /// The implicit and explicit interfaces require a key() accessor.
2577 inline const Key& key() const {
2578 return key_;
2579 }
2580
2581 inline void Get(const Value& value) {
2582 val_ = value.val_;
2583 }
2584 inline void GetAtomic(const Value& value) {
2585 val_ = value.atomic_val_.load();
2586 }
2587
2588 uint64_t val() const {
2589 return val_;
2590 }
2591
2592 protected:
2593 /// The explicit interface requires a DeepCopy_Internal() implementation.
2594 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2595 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2596 }
2597
2598 private:
2599 Key key_;
2600 uint32_t val_;
2601 public:
2602 const uint32_t expected;
2603 };
2604
2605 {
2606 // 6 pages!
2607 store_t store{ 524288, 201326592, "storage", 0.4 };
2608
2609 // Populate the store.
2610 store.StartSession();
2611 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
2612 UpsertContext context{ Key{ idx }, 999 };
2613 Status result = store.Upsert(context, upsert_callback, 1);
2614 ASSERT_EQ(Status::Ok, result);
2615 if(idx % 256 == 0) {
2616 store.Refresh();
2617 store.CompletePending(false);
2618 }
2619 }
2620
2621 store.StopSession();
2622
2623 /// Update and checkpoint the store.
2624 auto upsert_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
2625 assert(thread_id == 0);
2626 session_ids[thread_id] = store->StartSession();
2627 ++num_threads_started;
2628
2629 // update some records
2630 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2631 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2632 UpsertContext context{ Key{ idx }, idx + 1 };
2633
2634 Status result = store->Upsert(context, upsert_callback, idx + 1);
2635 ASSERT_EQ(Status::Ok, result);
2636
2637 if(idx % 256 == 0) {
2638 store->Refresh();
2639 }
2640 }
2641
2642 while(num_threads_started < kNumThreads) {
2643 std::this_thread::yield();
2644 }
2645 // checkpoint the index (transition from REST to INDEX_CHKPT)
2646 ASSERT_TRUE(store->CheckpointIndex(index_persistence_callback, index_token));
2647 // Ensure that the index checkpoint completes.
2648 while(!index_checkpoint_completed) {
2649 store->CompletePending(false);
2650 }
2651 store->CompletePending(false);
2652
2653 // checkpoint the hybrid log (transition from REST to PREPARE)
2654 ASSERT_TRUE(store->CheckpointHybridLog(hybrid_log_persistence_callback, hybrid_log_token));
2655 // Ensure that the hybrid-log checkpoint completes.
2656 while(num_threads_persistent < kNumThreads) {
2657 store->CompletePending(false);
2658 }
2659
2660 bool result = store->CompletePending(true);
2661 ASSERT_TRUE(result);
2662 store->StopSession();
2663 };
2664
2665 auto upsert_worker = [](store_t* store, uint32_t thread_id) {
2666 assert(thread_id != 0);
2667 session_ids[thread_id] = store->StartSession();
2668 ++num_threads_started;
2669
2670 // update some records
2671 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2672 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2673 UpsertContext context{ Key{ idx }, idx + 1 };
2674 Status result = store->Upsert(context, upsert_callback, idx + 1);
2675 ASSERT_EQ(Status::Ok, result);
2676
2677 if(idx % 256 == 0) {
2678 store->Refresh();
2679 }
2680 }
2681
2682 // Don't exit this session until the checkpoint has completed.
2683 while(num_threads_persistent < kNumThreads) {
2684 store->CompletePending(false);
2685 }
2686
2687 bool result = store->CompletePending(true);
2688 ASSERT_TRUE(result);
2689 store->StopSession();
2690 };
2691
2692 std::deque<std::thread> threads{};
2693 threads.emplace_back(upsert_checkpoint_worker, &store, 0);
2694 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
2695 threads.emplace_back(upsert_worker, &store, idx);
2696 }
2697 for(auto& thread : threads) {
2698 thread.join();
2699 }
2700
2701 // Verify the store.
2702 store.StartSession();
2703 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
2704 auto callback = [](IAsyncContext* ctxt, Status result) {
2705 CallbackContext<ReadContext1> context{ ctxt };
2706 ASSERT_EQ(Status::Ok, result);
2707 ASSERT_EQ(context->expected, context->val());
2708 };
2709
2710 ReadContext1 context{ Key{ idx }, idx + 1 };
2711 Status result = store.Read(context, callback, 1);
2712 if(result != Status::Ok) {
2713 ASSERT_EQ(Status::Pending, result);
2714 }
2715 if(idx % 256 == 0) {
2716 store.Refresh();
2717 store.CompletePending(false);
2718 }
2719 }
2720
2721 bool result = store.CompletePending(true);
2722 ASSERT_TRUE(result);
2723 store.StopSession();
2724 }
2725
2726 // Test recovery.
2727 store_t new_store{ 524288, 201326592, "storage", 0.4 };
2728
2729 uint32_t version;
2730 std::vector<Guid> recovered_session_ids;
2731 Status status = new_store.Recover(index_token, hybrid_log_token, version, recovered_session_ids);
2732 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
2733 ASSERT_EQ(Status::Ok, status);
2734
2735 static std::atomic<uint32_t> records_read;
2736 records_read = 0;
2737
2738 class ReadContext2 : public IAsyncContext {
2739 public:
2740 typedef Key key_t;
2741 typedef Value value_t;
2742
2743 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
2744 : key_{ key }
2745 , val_{ 0 }
2746 , expected{ expected_ }
2747 , idx{ idx_ }
2748 , found{ found_ } {
2749 }
2750
2751 /// Copy (and deep-copy) constructor.
2752 ReadContext2(const ReadContext2& other)
2753 : key_{ other.key_ }
2754 , val_{ other.val_ }
2755 , expected{ other.expected }
2756 , idx{ other.idx }
2757 , found{ other.found } {
2758 }
2759
2760 /// The implicit and explicit interfaces require a key() accessor.
2761 inline const Key& key() const {
2762 return key_;
2763 }
2764
2765 inline void Get(const Value& value) {
2766 val_ = value.val_;
2767 }
2768 inline void GetAtomic(const Value& value) {
2769 val_ = value.atomic_val_.load();
2770 }
2771
2772 uint64_t val() const {
2773 return val_;
2774 }
2775
2776 protected:
2777 /// The explicit interface requires a DeepCopy_Internal() implementation.
2778 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2779 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2780 }
2781
2782 private:
2783 Key key_;
2784 uint32_t val_;
2785 public:
2786 const uint32_t expected;
2787 const uint32_t idx;
2788 std::atomic<bool>* found;
2789 };
2790
2791 auto read_worker = [](store_t* store, uint32_t thread_id) {
2792 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
2793 ASSERT_GE(serial_num, 1);
2794
2795 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
2796 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
2797
2798 // verify records
2799 auto callback = [](IAsyncContext* ctxt, Status result) {
2800 CallbackContext<ReadContext2> context{ ctxt };
2801 ASSERT_EQ(Status::Ok, result);
2802 if(context->expected == context->val()) {
2803 bool expected = false;
2804 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
2805 } else {
2806 ASSERT_EQ(999, context->val());
2807 bool expected = false;
2808 ASSERT_FALSE(context->found[context->idx].load());
2809 }
2810 };
2811 for(uint32_t idx = kNumRecordsPerThread * thread_id;
2812 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
2813 ReadContext2 context{ Key{ idx }, idx + 1, idx - (kNumRecordsPerThread * thread_id),
2814 found.get() };
2815 Status result = store->Read(context, callback, 1);
2816 if(result == Status::Ok) {
2817 ++records_read;
2818 if(context.expected == context.val()) {
2819 bool expected = false;
2820 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
2821 } else {
2822 ASSERT_EQ(999, context.val());
2823 bool expected = false;
2824 ASSERT_FALSE(found.get()[context.idx].load());
2825 }
2826 } else {
2827 ASSERT_EQ(Status::Pending, result) << idx;
2828 }
2829 if(idx % 256 == 0) {
2830 store->Refresh();
2831 store->CompletePending(false);
2832 }
2833 }
2834 store->CompletePending(true);
2835 store->StopSession();
2836
2837 bool found_all = true;
2838 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
2839 if(found_all != found.get()[idx]) {
2840 // Consistent-point recovery implies that after one record isn't found, all subsequent
2841 // records will not be found.
2842 Key key{ kNumRecordsPerThread* thread_id + idx };
2843 KeyHash hash = key.GetHash();
2844 std::string error;
2845 error += "key = ";
2846 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
2847 error += ", idx = ";
2848 error += std::to_string(hash.idx(8192));
2849 error += ", tag = ";
2850 error += std::to_string(hash.tag());
2851 ASSERT_TRUE(found_all) << error;
2852 found_all = false;
2853 }
2854 }
2855 };
2856
2857 std::deque<std::thread> threads{};
2858 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
2859 threads.emplace_back(read_worker, &new_store, idx);
2860 }
2861 for(auto& thread : threads) {
2862 thread.join();
2863 }
2864
2865 ASSERT_GT(records_read, (uint32_t)0);
2866 ASSERT_LE(records_read, kNumRecords);
2867}
2868
2869TEST(CLASS, Concurrent_Rmw_Small) {
2870 class RmwContext;
2871
2872 class Key {
2873 public:
2874 Key(uint32_t key)
2875 : key_{ key } {
2876 }
2877
2878 inline static constexpr uint32_t size() {
2879 return static_cast<uint32_t>(sizeof(Key));
2880 }
2881 inline KeyHash GetHash() const {
2882 std::hash<uint32_t> hash_fn{};
2883 return KeyHash{ hash_fn(key_) };
2884 }
2885
2886 /// Comparison operators.
2887 inline bool operator==(const Key& other) const {
2888 return key_ == other.key_;
2889 }
2890 inline bool operator!=(const Key& other) const {
2891 return key_ != other.key_;
2892 }
2893
2894 friend class RmwContext;
2895
2896 private:
2897 uint32_t key_;
2898 };
2899 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
2900 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
2901
2902 class ReadContext1;
2903 class ReadContext2;
2904
2905 class Value {
2906 public:
2907 Value()
2908 : val_{ 0 } {
2909 }
2910
2911 inline static constexpr uint32_t size() {
2912 return static_cast<uint32_t>(sizeof(Value));
2913 }
2914
2915 friend class RmwContext;
2916 friend class ReadContext1;
2917 friend class ReadContext2;
2918
2919 private:
2920 union {
2921 std::atomic<uint32_t> atomic_val_;
2922 uint32_t val_;
2923 };
2924 };
2925 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
2926 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
2927
2928 class RmwContext : public IAsyncContext {
2929 public:
2930 typedef Key key_t;
2931 typedef Value value_t;
2932
2933 RmwContext(const Key& key, uint32_t delta)
2934 : key_{ key }
2935 , delta_{ delta } {
2936 }
2937
2938 /// Copy (and deep-copy) constructor.
2939 RmwContext(const RmwContext& other)
2940 : key_{ other.key_ }
2941 , delta_{ other.delta_ } {
2942 }
2943
2944 /// The implicit and explicit interfaces require a key() accessor.
2945 inline const Key& key() const {
2946 return key_;
2947 }
2948 inline static constexpr uint32_t value_size() {
2949 return sizeof(value_t);
2950 }
2951 /// Non-atomic and atomic Put() methods.
2952 inline void RmwInitial(Value& value) {
2953 value.val_ = key_.key_;
2954 }
2955 inline void RmwCopy(const value_t& old_value, value_t& value) {
2956 value.val_ = old_value.val_ + delta_;
2957 }
2958 inline bool RmwAtomic(value_t& value) {
2959 value.atomic_val_ += delta_;
2960 return true;
2961 }
2962 protected:
2963 /// The explicit interface requires a DeepCopy_Internal() implementation.
2964 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2965 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2966 }
2967
2968 private:
2969 Key key_;
2970 uint32_t delta_;
2971 };
2972
2973 std::experimental::filesystem::create_directories("storage");
2974
2975 static constexpr uint32_t kNumRecords = 200000;
2976 static constexpr uint32_t kNumThreads = 2;
2977 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
2978
2979 static Guid session_ids[kNumThreads];
2980 std::memset(session_ids, 0, sizeof(session_ids));
2981 static Guid token;
2982
2983 static std::atomic<uint32_t> num_threads_persistent;
2984 num_threads_persistent = 0;
2985 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads] = {};
2986 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
2987 threads_persistent[idx] = false;
2988 }
2989
2990 static std::atomic<uint32_t> num_threads_started;
2991 num_threads_started = 0;
2992
2993 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
2994 bool expected = false;
2995 ASSERT_EQ(Status::Ok, result);
2996 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
2997 ++num_threads_persistent;
2998 };
2999
3000 typedef FasterKv<Key, Value, disk_t> store_t;
3001
3002 class ReadContext1 : public IAsyncContext {
3003 public:
3004 typedef Key key_t;
3005 typedef Value value_t;
3006
3007 ReadContext1(Key key, uint32_t expected_)
3008 : key_{ key }
3009 , val_{ 0 }
3010 , expected{ expected_ } {
3011 }
3012
3013 /// Copy (and deep-copy) constructor.
3014 ReadContext1(const ReadContext1& other)
3015 : key_{ other.key_ }
3016 , val_{ other.val_ }
3017 , expected{ other.expected } {
3018 }
3019
3020 /// The implicit and explicit interfaces require a key() accessor.
3021 inline const Key& key() const {
3022 return key_;
3023 }
3024
3025 inline void Get(const Value& value) {
3026 val_ = value.val_;
3027 }
3028 inline void GetAtomic(const Value& value) {
3029 val_ = value.atomic_val_.load();
3030 }
3031
3032 uint64_t val() const {
3033 return val_;
3034 }
3035
3036 protected:
3037 /// The explicit interface requires a DeepCopy_Internal() implementation.
3038 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
3039 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
3040 }
3041
3042 private:
3043 Key key_;
3044 uint32_t val_;
3045 public:
3046 const uint32_t expected;
3047 };
3048
3049 {
3050 // 6 pages!
3051 store_t store{ 8192, 402653184, "storage", 0.4 };
3052
3053 // Populate the store.
3054 store.StartSession();
3055 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
3056 auto callback = [](IAsyncContext* context, Status result) {
3057 ASSERT_EQ(Status::Ok, result);
3058 };
3059
3060 RmwContext context{ Key{ idx }, 230 };
3061 Status result = store.Rmw(context, callback, 1);
3062 ASSERT_EQ(Status::Ok, result);
3063 if(idx % 256 == 0) {
3064 store.Refresh();
3065 store.CompletePending(false);
3066 }
3067 }
3068 store.StopSession();
3069
3070 /// Read-modify-write and checkpoint the store.
3071 auto rmw_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
3072 assert(thread_id == 0);
3073 session_ids[thread_id] = store->StartSession();
3074 ++num_threads_started;
3075
3076 // read-modify-write some records
3077 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3078 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3079 auto callback = [](IAsyncContext* context, Status result) {
3080 ASSERT_EQ(Status::Ok, result);
3081 };
3082 RmwContext context{ Key{ idx }, 230 };
3083 Status result = store->Rmw(context, callback, idx + 1);
3084 ASSERT_EQ(Status::Ok, result);
3085
3086 if(idx % 256 == 0) {
3087 store->Refresh();
3088 store->CompletePending(false);
3089 }
3090 }
3091
3092 while(num_threads_started < kNumThreads) {
3093 std::this_thread::yield();
3094 }
3095 // checkpoint (transition from REST to INDEX_CHKPT)
3096 ASSERT_TRUE(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token));
3097
3098 // Ensure that the checkpoint completes.
3099 while(num_threads_persistent < kNumThreads) {
3100 store->CompletePending(false);
3101 }
3102
3103 bool result = store->CompletePending(true);
3104 ASSERT_TRUE(result);
3105 store->StopSession();
3106 };
3107
3108 auto rmw_worker = [](store_t* store, uint32_t thread_id) {
3109 assert(thread_id != 0);
3110 session_ids[thread_id] = store->StartSession();
3111 ++num_threads_started;
3112
3113 // update some records
3114 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3115 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3116 auto callback = [](IAsyncContext* context, Status result) {
3117 ASSERT_EQ(Status::Ok, result);
3118 };
3119 RmwContext context{ Key{ idx }, 230 };
3120 Status result = store->Rmw(context, callback, idx + 1);
3121 ASSERT_EQ(Status::Ok, result);
3122
3123 if(idx % 256 == 0) {
3124 store->Refresh();
3125 store->CompletePending(false);
3126 }
3127 }
3128
3129 // Don't exit this session until the checkpoint has completed.
3130 while(num_threads_persistent < kNumThreads) {
3131 store->CompletePending(false);
3132 }
3133
3134 bool result = store->CompletePending(true);
3135 ASSERT_TRUE(result);
3136 store->StopSession();
3137 };
3138
3139 std::deque<std::thread> threads{};
3140 threads.emplace_back(rmw_checkpoint_worker, &store, 0);
3141 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
3142 threads.emplace_back(rmw_worker, &store, idx);
3143 }
3144 for(auto& thread : threads) {
3145 thread.join();
3146 }
3147
3148 // Verify the store.
3149 store.StartSession();
3150 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
3151 auto callback = [](IAsyncContext* ctxt, Status result) {
3152 CallbackContext<ReadContext1> context{ ctxt };
3153 ASSERT_EQ(Status::Ok, result);
3154 ASSERT_EQ(context->expected, context->val());
3155 };
3156
3157 ReadContext1 context{ Key{ idx }, idx + 230 };
3158 Status result = store.Read(context, callback, 1);
3159 if(result != Status::Ok) {
3160 ASSERT_EQ(Status::Pending, result);
3161 }
3162 }
3163 store.StopSession();
3164 }
3165
3166 // Test recovery.
3167 store_t new_store{ 8192, 402653184, "storage", 0.4 };
3168
3169 uint32_t version;
3170 std::vector<Guid> recovered_session_ids;
3171 Status status = new_store.Recover(token, token, version, recovered_session_ids);
3172 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
3173 ASSERT_EQ(Status::Ok, status);
3174
3175 static std::atomic<uint32_t> records_read;
3176 records_read = 0;
3177
3178 class ReadContext2 : public IAsyncContext {
3179 public:
3180 typedef Key key_t;
3181 typedef Value value_t;
3182
3183 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
3184 : key_{ key }
3185 , val_{ 0 }
3186 , expected{ expected_ }
3187 , idx{ idx_ }
3188 , found{ found_ } {
3189 }
3190
3191 /// Copy (and deep-copy) constructor.
3192 ReadContext2(const ReadContext2& other)
3193 : key_{ other.key_ }
3194 , val_{ other.val_ }
3195 , expected{ other.expected }
3196 , idx{ other.idx }
3197 , found{ other.found } {
3198 }
3199
3200 /// The implicit and explicit interfaces require a key() accessor.
3201 inline const Key& key() const {
3202 return key_;
3203 }
3204
3205 inline void Get(const Value& value) {
3206 val_ = value.val_;
3207 }
3208 inline void GetAtomic(const Value& value) {
3209 val_ = value.atomic_val_.load();
3210 }
3211
3212 uint64_t val() const {
3213 return val_;
3214 }
3215
3216 protected:
3217 /// The explicit interface requires a DeepCopy_Internal() implementation.
3218 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
3219 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
3220 }
3221
3222 private:
3223 Key key_;
3224 uint32_t val_;
3225 public:
3226 const uint32_t expected;
3227 const uint32_t idx;
3228 std::atomic<bool>* found;
3229 };
3230
3231 auto read_worker = [](store_t* store, uint32_t thread_id) {
3232 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
3233 ASSERT_GE(serial_num, 1);
3234
3235 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
3236 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
3237
3238 // verify records
3239 auto callback = [](IAsyncContext* ctxt, Status result) {
3240 CallbackContext<ReadContext2> context{ ctxt };
3241 ASSERT_EQ(Status::Ok, result);
3242 if(context->expected == context->val()) {
3243 bool expected = false;
3244 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
3245 } else {
3246 ASSERT_EQ(context->expected - 230, context->val());
3247 bool expected = false;
3248 ASSERT_FALSE(context->found[context->idx].load());
3249 }
3250 };
3251 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3252 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3253 ReadContext2 context{ Key{ idx }, idx + 230, idx - (kNumRecordsPerThread * thread_id),
3254 found.get() };
3255 Status result = store->Read(context, callback, 1);
3256 if(result == Status::Ok) {
3257 ++records_read;
3258 if(context.expected == context.val()) {
3259 bool expected = false;
3260 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
3261 } else {
3262 ASSERT_EQ(idx, context.val());
3263 bool expected = false;
3264 ASSERT_FALSE(found.get()[context.idx].load());
3265 }
3266 } else {
3267 ASSERT_EQ(Status::Pending, result);
3268 }
3269 if(idx % 256 == 0) {
3270 store->Refresh();
3271 store->CompletePending(false);
3272 }
3273 }
3274 store->CompletePending(true);
3275 store->StopSession();
3276
3277 bool found_all = true;
3278 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
3279 if(found_all != found.get()[idx]) {
3280 // Consistent-point recovery implies that after one record isn't found, all subsequent
3281 // records will not be found.
3282 Key key{ kNumRecordsPerThread* thread_id + idx };
3283 KeyHash hash = key.GetHash();
3284 std::string error;
3285 error += "key = ";
3286 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
3287 error += ", idx = ";
3288 error += std::to_string(hash.idx(8192));
3289 error += ", tag = ";
3290 error += std::to_string(hash.tag());
3291 ASSERT_TRUE(found_all) << error;
3292 found_all = false;
3293 }
3294 }
3295 };
3296
3297 std::deque<std::thread> threads{};
3298 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
3299 threads.emplace_back(read_worker, &new_store, idx);
3300 }
3301 for(auto& thread : threads) {
3302 thread.join();
3303 }
3304
3305 ASSERT_GT(records_read, (uint32_t)0);
3306 ASSERT_LE(records_read, kNumRecords);
3307}
3308
3309TEST(CLASS, Concurrent_Rmw_Large) {
3310 class RmwContext;
3311
3312 class Key {
3313 public:
3314 Key(uint32_t key)
3315 : key_{ key } {
3316 }
3317
3318 inline static constexpr uint32_t size() {
3319 return static_cast<uint32_t>(sizeof(Key));
3320 }
3321 inline KeyHash GetHash() const {
3322 std::hash<uint32_t> hash_fn{};
3323 return KeyHash{ hash_fn(key_) };
3324 }
3325
3326 /// Comparison operators.
3327 inline bool operator==(const Key& other) const {
3328 return key_ == other.key_;
3329 }
3330 inline bool operator!=(const Key& other) const {
3331 return key_ != other.key_;
3332 }
3333
3334 friend class RmwContext;
3335
3336 private:
3337 uint32_t key_;
3338 };
3339 static_assert(sizeof(Key) == 4, "sizeof(Key) != 4");
3340 static_assert(alignof(Key) == 4, "alignof(Key) != 4");
3341
3342 class ReadContext1;
3343 class ReadContext2;
3344
3345 class Value {
3346 public:
3347 Value()
3348 : val_{ 0 } {
3349 }
3350
3351 inline static constexpr uint32_t size() {
3352 return static_cast<uint32_t>(sizeof(Value));
3353 }
3354
3355 friend class RmwContext;
3356 friend class ReadContext1;
3357 friend class ReadContext2;
3358
3359 private:
3360 union {
3361 std::atomic<uint32_t> atomic_val_;
3362 uint32_t val_;
3363 };
3364 };
3365 static_assert(sizeof(Value) == 4, "sizeof(Value) != 4");
3366 static_assert(alignof(Value) == 4, "alignof(Value) != 4");
3367
3368 class RmwContext : public IAsyncContext {
3369 public:
3370 typedef Key key_t;
3371 typedef Value value_t;
3372
3373 RmwContext(const Key& key, uint32_t delta)
3374 : key_{ key }
3375 , delta_{ delta } {
3376 }
3377
3378 /// Copy (and deep-copy) constructor.
3379 RmwContext(const RmwContext& other)
3380 : key_{ other.key_ }
3381 , delta_{ other.delta_ } {
3382 }
3383
3384 /// The implicit and explicit interfaces require a key() accessor.
3385 inline const Key& key() const {
3386 return key_;
3387 }
3388 inline static constexpr uint32_t value_size() {
3389 return sizeof(value_t);
3390 }
3391 /// Non-atomic and atomic Put() methods.
3392 inline void RmwInitial(Value& value) {
3393 value.val_ = key_.key_;
3394 }
3395 inline void RmwCopy(const value_t& old_value, value_t& value) {
3396 value.val_ = old_value.val_ + delta_;
3397 }
3398 inline bool RmwAtomic(value_t& value) {
3399 value.atomic_val_ += delta_;
3400 return true;
3401 }
3402 protected:
3403 /// The explicit interface requires a DeepCopy_Internal() implementation.
3404 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
3405 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
3406 }
3407
3408 private:
3409 Key key_;
3410 uint32_t delta_;
3411 };
3412
3413 std::experimental::filesystem::create_directories("storage");
3414
3415 static constexpr uint32_t kNumRecords = 1000000;
3416 static constexpr uint32_t kNumThreads = 2;
3417 static_assert(kNumRecords % kNumThreads == 0, "kNumRecords % kNumThreads != 0");
3418 static constexpr uint32_t kNumRecordsPerThread = kNumRecords / kNumThreads;
3419
3420 static Guid session_ids[kNumThreads];
3421 std::memset(session_ids, 0, sizeof(session_ids));
3422 static Guid token;
3423
3424 static std::atomic<uint32_t> num_threads_persistent;
3425 num_threads_persistent = 0;
3426 static std::atomic<bool> threads_persistent[Thread::kMaxNumThreads];
3427 for(size_t idx = 0; idx < Thread::kMaxNumThreads; ++idx) {
3428 threads_persistent[idx] = false;
3429 }
3430
3431 static std::atomic<uint32_t> num_threads_started;
3432 num_threads_started = 0;
3433
3434 static auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
3435 bool expected = false;
3436 ASSERT_EQ(Status::Ok, result);
3437 ASSERT_TRUE(threads_persistent[Thread::id()].compare_exchange_strong(expected, true));
3438 ++num_threads_persistent;
3439 };
3440
3441 typedef FasterKv<Key, Value, disk_t> store_t;
3442
3443 class ReadContext1 : public IAsyncContext {
3444 public:
3445 typedef Key key_t;
3446 typedef Value value_t;
3447
3448 ReadContext1(Key key, uint32_t expected_)
3449 : key_{ key }
3450 , val_{ 0 }
3451 , expected{ expected_ } {
3452 }
3453
3454 /// Copy (and deep-copy) constructor.
3455 ReadContext1(const ReadContext1& other)
3456 : key_{ other.key_ }
3457 , val_{ other.val_ }
3458 , expected{ other.expected } {
3459 }
3460
3461 /// The implicit and explicit interfaces require a key() accessor.
3462 inline const Key& key() const {
3463 return key_;
3464 }
3465
3466 inline void Get(const Value& value) {
3467 val_ = value.val_;
3468 }
3469 inline void GetAtomic(const Value& value) {
3470 val_ = value.atomic_val_.load();
3471 }
3472
3473 uint64_t val() const {
3474 return val_;
3475 }
3476
3477 protected:
3478 /// The explicit interface requires a DeepCopy_Internal() implementation.
3479 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
3480 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
3481 }
3482
3483 private:
3484 Key key_;
3485 uint32_t val_;
3486 public:
3487 const uint32_t expected;
3488 };
3489
3490 {
3491 // 6 pages!
3492 store_t store{ 524288, 402653184, "storage", 0.4 };
3493
3494 // Populate the store.
3495 auto populate_worker0 = [](store_t* store, uint32_t thread_id) {
3496 store->StartSession();
3497 auto callback = [](IAsyncContext* context, Status result) {
3498 ASSERT_EQ(Status::Ok, result);
3499 };
3500 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3501 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3502 RmwContext context{ Key{ idx }, 230 };
3503 Status result = store->Rmw(context, callback, 1);
3504 ASSERT_EQ(Status::Ok, result);
3505 if(idx % 256 == 0) {
3506 store->Refresh();
3507 store->CompletePending(false);
3508 }
3509 }
3510 store->GrowIndex(nullptr);
3511 store->StopSession();
3512 };
3513 auto populate_worker = [](store_t* store, uint32_t thread_id) {
3514 store->StartSession();
3515 auto callback = [](IAsyncContext* context, Status result) {
3516 ASSERT_EQ(Status::Ok, result);
3517 };
3518 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3519 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3520 RmwContext context{ Key{ idx }, 230 };
3521 Status result = store->Rmw(context, callback, 1);
3522 ASSERT_EQ(Status::Ok, result);
3523 if(idx % 256 == 0) {
3524 store->Refresh();
3525 store->CompletePending(false);
3526 }
3527 }
3528 store->StopSession();
3529 };
3530
3531 std::deque<std::thread> threads{};
3532 threads.emplace_back(populate_worker0, &store, 0);
3533 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
3534 threads.emplace_back(populate_worker, &store, idx);
3535 }
3536 for(auto& thread : threads) {
3537 thread.join();
3538 }
3539
3540 /// Read-modify-write and checkpoint the store.
3541 auto rmw_checkpoint_worker = [](store_t* store, uint32_t thread_id) {
3542 assert(thread_id == 0);
3543 session_ids[thread_id] = store->StartSession();
3544 ++num_threads_started;
3545
3546 // read-modify-write some records
3547 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3548 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3549 auto callback = [](IAsyncContext* context, Status result) {
3550 ASSERT_EQ(Status::Ok, result);
3551 };
3552 RmwContext context{ Key{ idx }, 230 };
3553 Status result = store->Rmw(context, callback, idx + 1);
3554 ASSERT_TRUE(result == Status::Ok || result == Status::Pending);
3555 if(idx % 256 == 0) {
3556 store->Refresh();
3557 store->CompletePending(false);
3558 }
3559 }
3560
3561 while(num_threads_started < kNumThreads) {
3562 std::this_thread::yield();
3563 }
3564 // checkpoint (transition from REST to INDEX_CHKPT)
3565 ASSERT_TRUE(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token));
3566
3567 // Ensure that the checkpoint completes.
3568 while(num_threads_persistent < kNumThreads) {
3569 store->CompletePending(false);
3570 }
3571
3572 bool result = store->CompletePending(true);
3573 ASSERT_TRUE(result);
3574 store->StopSession();
3575 };
3576
3577 auto rmw_worker = [](store_t* store, uint32_t thread_id) {
3578 assert(thread_id != 0);
3579 session_ids[thread_id] = store->StartSession();
3580 ++num_threads_started;
3581
3582 // update some records
3583 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3584 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3585 auto callback = [](IAsyncContext* context, Status result) {
3586 ASSERT_EQ(Status::Ok, result);
3587 };
3588 RmwContext context{ Key{ idx }, 230 };
3589 Status result = store->Rmw(context, callback, idx + 1);
3590 ASSERT_TRUE(result == Status::Ok || result == Status::Pending);
3591 if(idx % 256 == 0) {
3592 store->Refresh();
3593 store->CompletePending(false);
3594 }
3595 }
3596
3597 // Don't exit this session until the checkpoint has completed.
3598 while(num_threads_persistent < kNumThreads) {
3599 store->CompletePending(false);
3600 }
3601
3602 bool result = store->CompletePending(true);
3603 ASSERT_TRUE(result);
3604 store->StopSession();
3605 };
3606
3607 threads.clear();
3608 threads.emplace_back(rmw_checkpoint_worker, &store, 0);
3609 for(uint32_t idx = 1; idx < kNumThreads; ++idx) {
3610 threads.emplace_back(rmw_worker, &store, idx);
3611 }
3612 for(auto& thread : threads) {
3613 thread.join();
3614 }
3615
3616 // Verify the store.
3617 store.StartSession();
3618 for(uint32_t idx = 0; idx < kNumRecords; ++idx) {
3619 auto callback = [](IAsyncContext* ctxt, Status result) {
3620 CallbackContext<ReadContext1> context{ ctxt };
3621 ASSERT_EQ(Status::Ok, result);
3622 ASSERT_EQ(context->expected, context->val());
3623 };
3624
3625 ReadContext1 context{ Key{ idx }, idx + 230 };
3626 Status result = store.Read(context, callback, 1);
3627 if(result != Status::Ok) {
3628 ASSERT_EQ(Status::Pending, result);
3629 }
3630 }
3631 store.StopSession();
3632 }
3633
3634 // Test recovery.
3635 store_t new_store{ 524288 * 2, 402653184, "storage", 0.4 };
3636
3637 uint32_t version;
3638 std::vector<Guid> recovered_session_ids;
3639 Status status = new_store.Recover(token, token, version, recovered_session_ids);
3640 ASSERT_EQ(recovered_session_ids.size(), kNumThreads);
3641 ASSERT_EQ(Status::Ok, status);
3642
3643 static std::atomic<uint32_t> records_read;
3644 records_read = 0;
3645
3646 class ReadContext2 : public IAsyncContext {
3647 public:
3648 typedef Key key_t;
3649 typedef Value value_t;
3650
3651 ReadContext2(Key key, uint32_t expected_, uint32_t idx_, std::atomic<bool>* found_)
3652 : key_{ key }
3653 , val_{ 0 }
3654 , expected{ expected_ }
3655 , idx{ idx_ }
3656 , found{ found_ } {
3657 }
3658
3659 /// Copy (and deep-copy) constructor.
3660 ReadContext2(const ReadContext2& other)
3661 : key_{ other.key_ }
3662 , val_{ other.val_ }
3663 , expected{ other.expected }
3664 , idx{ other.idx }
3665 , found{ other.found } {
3666 }
3667
3668 /// The implicit and explicit interfaces require a key() accessor.
3669 inline const Key& key() const {
3670 return key_;
3671 }
3672
3673 inline void Get(const Value& value) {
3674 val_ = value.val_;
3675 }
3676 inline void GetAtomic(const Value& value) {
3677 val_ = value.atomic_val_.load();
3678 }
3679
3680 uint64_t val() const {
3681 return val_;
3682 }
3683
3684 protected:
3685 /// The explicit interface requires a DeepCopy_Internal() implementation.
3686 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
3687 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
3688 }
3689
3690 private:
3691 Key key_;
3692 uint32_t val_;
3693 public:
3694 const uint32_t expected;
3695 const uint32_t idx;
3696 std::atomic<bool>* found;
3697 };
3698
3699 auto read_worker = [](store_t* store, uint32_t thread_id) {
3700 uint64_t serial_num = store->ContinueSession(session_ids[thread_id]);
3701 ASSERT_GE(serial_num, 1);
3702
3703 std::unique_ptr<std::atomic<bool>> found{ new std::atomic<bool>[kNumRecordsPerThread] };
3704 std::memset(found.get(), 0, sizeof(found.get()[0]) * kNumRecordsPerThread);
3705
3706 // verify records
3707 auto callback = [](IAsyncContext* ctxt, Status result) {
3708 CallbackContext<ReadContext2> context{ ctxt };
3709 ASSERT_EQ(Status::Ok, result);
3710 if(context->expected == context->val()) {
3711 bool expected = false;
3712 ASSERT_TRUE(context->found[context->idx].compare_exchange_strong(expected, true));
3713 } else {
3714 ASSERT_EQ(context->expected - 230, context->val());
3715 bool expected = false;
3716 ASSERT_FALSE(context->found[context->idx].load());
3717 }
3718 };
3719 for(uint32_t idx = kNumRecordsPerThread * thread_id;
3720 idx < kNumRecordsPerThread * (thread_id + 1); ++idx) {
3721 ReadContext2 context{ Key{ idx }, idx + 230, idx - (kNumRecordsPerThread * thread_id),
3722 found.get() };
3723 Status result = store->Read(context, callback, 1);
3724 if(result == Status::Ok) {
3725 ++records_read;
3726 if(context.expected == context.val()) {
3727 bool expected = false;
3728 ASSERT_TRUE(found.get()[context.idx].compare_exchange_strong(expected, true));
3729 } else {
3730 ASSERT_EQ(idx, context.val());
3731 bool expected = false;
3732 ASSERT_FALSE(found.get()[context.idx].load());
3733 }
3734 } else {
3735 ASSERT_EQ(Status::Pending, result);
3736 }
3737 if(idx % 256 == 0) {
3738 store->Refresh();
3739 store->CompletePending(false);
3740 }
3741 }
3742 store->CompletePending(true);
3743 store->StopSession();
3744
3745 bool found_all = true;
3746 for(uint32_t idx = 0; idx < kNumRecordsPerThread; ++idx) {
3747 if(found_all != found.get()[idx]) {
3748 // Consistent-point recovery implies that after one record isn't found, all subsequent
3749 // records will not be found.
3750 Key key{ kNumRecordsPerThread* thread_id + idx };
3751 KeyHash hash = key.GetHash();
3752 std::string error;
3753 error += "key = ";
3754 error += std::to_string(kNumRecordsPerThread* thread_id + idx);
3755 error += ", idx = ";
3756 error += std::to_string(hash.idx(8192));
3757 error += ", tag = ";
3758 error += std::to_string(hash.tag());
3759 ASSERT_TRUE(found_all) << error;
3760 found_all = false;
3761 }
3762 }
3763 };
3764
3765 std::deque<std::thread> threads{};
3766 for(uint32_t idx = 0; idx < kNumThreads; ++idx) {
3767 threads.emplace_back(read_worker, &new_store, idx);
3768 }
3769 for(auto& thread : threads) {
3770 thread.join();
3771 }
3772
3773 ASSERT_GT(records_read, (uint32_t)0);
3774 ASSERT_LE(records_read, kNumRecords);
3775}
3776