1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <experimental/filesystem>
7
8using namespace FASTER;
9
10/// Disk's log uses 64 MB segments.
11typedef FASTER::device::FileSystemDisk<handler_t, 67108864L> disk_t;
12
13TEST(CLASS, UpsertRead_Serial) {
14 class Key {
15 public:
16 Key(uint64_t pt1, uint64_t pt2)
17 : pt1_{ pt1 }
18 , pt2_{ pt2 } {
19 }
20
21 inline static constexpr uint32_t size() {
22 return static_cast<uint32_t>(sizeof(Key));
23 }
24 inline KeyHash GetHash() const {
25 std::hash<uint64_t> hash_fn;
26 return KeyHash{ hash_fn(pt1_) };
27 }
28
29 /// Comparison operators.
30 inline bool operator==(const Key& other) const {
31 return pt1_ == other.pt1_ &&
32 pt2_ == other.pt2_;
33 }
34 inline bool operator!=(const Key& other) const {
35 return pt1_ != other.pt1_ ||
36 pt2_ != other.pt2_;
37 }
38
39 private:
40 uint64_t pt1_;
41 uint64_t pt2_;
42 };
43
44 class UpsertContext;
45 class ReadContext;
46
47 class Value {
48 public:
49 Value()
50 : gen_{ 0 }
51 , value_{ 0 }
52 , length_{ 0 } {
53 }
54
55 inline static constexpr uint32_t size() {
56 return static_cast<uint32_t>(sizeof(Value));
57 }
58
59 friend class UpsertContext;
60 friend class ReadContext;
61
62 private:
63 std::atomic<uint64_t> gen_;
64 uint8_t value_[1014];
65 uint16_t length_;
66 };
67 static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024");
68 static_assert(alignof(Value) == 8, "alignof(Value) != 8");
69
70 class UpsertContext : public IAsyncContext {
71 public:
72 typedef Key key_t;
73 typedef Value value_t;
74
75 UpsertContext(const Key& key, uint8_t val)
76 : key_{ key }
77 , val_{ val } {
78 }
79
80 /// Copy (and deep-copy) constructor.
81 UpsertContext(const UpsertContext& other)
82 : key_{ other.key_ }
83 , val_{ other.val_ } {
84 }
85
86 /// The implicit and explicit interfaces require a key() accessor.
87 inline const Key& key() const {
88 return key_;
89 }
90 inline static constexpr uint32_t value_size() {
91 return sizeof(value_t);
92 }
93 /// Non-atomic and atomic Put() methods.
94 inline void Put(Value& value) {
95 value.gen_ = 0;
96 std::memset(value.value_, val_, val_);
97 value.length_ = val_;
98 }
99 inline bool PutAtomic(Value& value) {
100 // Get the lock on the value.
101 uint64_t expected_gen;
102 bool success;
103 do {
104 do {
105 // Spin until other the thread releases the lock.
106 expected_gen = value.gen_.load();
107 } while(expected_gen == UINT64_MAX);
108 // Try to get the lock.
109 success = value.gen_.compare_exchange_weak(expected_gen, UINT64_MAX);
110 } while(!success);
111
112 std::memset(value.value_, val_, val_);
113 value.length_ = val_;
114 // Increment the value's generation number.
115 value.gen_.store(expected_gen + 1);
116 return true;
117 }
118
119 protected:
120 /// The explicit interface requires a DeepCopy_Internal() implementation.
121 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
122 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
123 }
124
125 private:
126 Key key_;
127 uint8_t val_;
128 };
129
130 class ReadContext : public IAsyncContext {
131 public:
132 typedef Key key_t;
133 typedef Value value_t;
134
135 ReadContext(Key key, uint8_t expected)
136 : key_{ key }
137 , expected_{ expected } {
138 }
139
140 /// Copy (and deep-copy) constructor.
141 ReadContext(const ReadContext& other)
142 : key_{ other.key_ }
143 , expected_{ other.expected_ } {
144 }
145
146 /// The implicit and explicit interfaces require a key() accessor.
147 inline const Key& key() const {
148 return key_;
149 }
150
151 inline void Get(const Value& value) {
152 // This is a paging test, so we expect to read stuff from disk.
153 ASSERT_EQ(expected_, value.length_);
154 ASSERT_EQ(expected_, value.value_[expected_ - 5]);
155 }
156 inline void GetAtomic(const Value& value) {
157 uint64_t post_gen = value.gen_.load();
158 uint64_t pre_gen;
159 uint16_t len;
160 uint8_t val;
161 do {
162 // Pre- gen # for this read is last read's post- gen #.
163 pre_gen = post_gen;
164 len = value.length_;
165 val = value.value_[len - 5];
166 post_gen = value.gen_.load();
167 } while(pre_gen != post_gen);
168 ASSERT_EQ(expected_, static_cast<uint8_t>(len));
169 ASSERT_EQ(expected_, val);
170 }
171
172 protected:
173 /// The explicit interface requires a DeepCopy_Internal() implementation.
174 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
175 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
176 }
177
178 private:
179 Key key_;
180 uint8_t expected_;
181 };
182
183 std::experimental::filesystem::create_directories("logs");
184
185 // 8 pages!
186 FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs", 0.5 };
187
188 Guid session_id = store.StartSession();
189
190 constexpr size_t kNumRecords = 250000;
191
192 // Insert.
193 for(size_t idx = 0; idx < kNumRecords; ++idx) {
194 auto callback = [](IAsyncContext* ctxt, Status result) {
195 // Upserts don't go to disk.
196 ASSERT_TRUE(false);
197 };
198
199 if(idx % 256 == 0) {
200 store.Refresh();
201 }
202
203 UpsertContext context{ Key{idx, idx}, 25 };
204 Status result = store.Upsert(context, callback, 1);
205 ASSERT_EQ(Status::Ok, result);
206 }
207 // Read.
208 static std::atomic<uint64_t> records_read{ 0 };
209 for(size_t idx = 0; idx < kNumRecords; ++idx) {
210 auto callback = [](IAsyncContext* ctxt, Status result) {
211 CallbackContext<ReadContext> context{ ctxt };
212 ASSERT_EQ(Status::Ok, result);
213 ++records_read;
214 };
215
216 if(idx % 256 == 0) {
217 store.Refresh();
218 }
219
220 ReadContext context{ Key{ idx, idx}, 25 };
221 Status result = store.Read(context, callback, 1);
222 if(result == Status::Ok) {
223 ++records_read;
224 } else {
225 ASSERT_EQ(Status::Pending, result);
226 }
227 }
228
229 ASSERT_LT(records_read.load(), kNumRecords);
230 bool result = store.CompletePending(true);
231 ASSERT_TRUE(result);
232 ASSERT_EQ(kNumRecords, records_read.load());
233
234 // Update.
235 static std::atomic<uint64_t> records_updated{ 0 };
236 for(size_t idx = 0; idx < kNumRecords; ++idx) {
237 auto callback = [](IAsyncContext* ctxt, Status result) {
238 // Upserts don't go to disk.
239 ASSERT_TRUE(false);
240 };
241
242 if(idx % 256 == 0) {
243 store.Refresh();
244 }
245
246 UpsertContext context{ Key{ idx, idx }, 87 };
247 Status result = store.Upsert(context, callback, 1);
248 if(result == Status::Ok) {
249 ++records_updated;
250 } else {
251 ASSERT_EQ(Status::Pending, result);
252 }
253 }
254
255 ASSERT_EQ(kNumRecords, records_updated.load());
256 result = store.CompletePending(true);
257 ASSERT_TRUE(result);
258
259 // Read again.
260 records_read = 0;;
261 for(size_t idx = 0; idx < kNumRecords; ++idx) {
262 auto callback = [](IAsyncContext* ctxt, Status result) {
263 CallbackContext<ReadContext> context{ ctxt };
264 ASSERT_EQ(Status::Ok, result);
265 ++records_read;
266 };
267
268 if(idx % 256 == 0) {
269 store.Refresh();
270 }
271
272 ReadContext context{ Key{ idx, idx }, 87 };
273 Status result = store.Read(context, callback, 1);
274 if(result == Status::Ok) {
275 ++records_read;
276 } else {
277 ASSERT_EQ(Status::Pending, result);
278 }
279 }
280
281 ASSERT_LT(records_read.load(), kNumRecords);
282 result = store.CompletePending(true);
283 ASSERT_TRUE(result);
284 ASSERT_EQ(kNumRecords, records_read.load());
285
286 store.StopSession();
287}
288
289TEST(CLASS, UpsertRead_Concurrent) {
290 class UpsertContext;
291 class ReadContext;
292
293 class Key {
294 public:
295 Key(uint64_t pt1, uint64_t pt2)
296 : pt1_{ pt1 }
297 , pt2_{ pt2 } {
298 }
299
300 inline static constexpr uint32_t size() {
301 return static_cast<uint32_t>(sizeof(Key));
302 }
303 inline KeyHash GetHash() const {
304 std::hash<uint64_t> hash_fn;
305 return KeyHash{ hash_fn(pt1_) };
306 }
307
308 /// Comparison operators.
309 inline bool operator==(const Key& other) const {
310 return pt1_ == other.pt1_ &&
311 pt2_ == other.pt2_;
312 }
313 inline bool operator!=(const Key& other) const {
314 return pt1_ != other.pt1_ ||
315 pt2_ != other.pt2_;
316 }
317
318 friend class UpsertContext;
319 friend class ReadContext;
320
321 private:
322 uint64_t pt1_;
323 uint64_t pt2_;
324 };
325
326 class Value {
327 public:
328 Value()
329 : gen_{ 0 }
330 , value_{ 0 }
331 , length_{ 0 } {
332 }
333
334 inline static constexpr uint32_t size() {
335 return static_cast<uint32_t>(sizeof(Value));
336 }
337
338 friend class UpsertContext;
339 friend class ReadContext;
340
341 private:
342 std::atomic<uint64_t> gen_;
343 uint8_t value_[1014];
344 uint16_t length_;
345 };
346 static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024");
347 static_assert(alignof(Value) == 8, "alignof(Value) != 8");
348
349 class UpsertContext : public IAsyncContext {
350 public:
351 typedef Key key_t;
352 typedef Value value_t;
353
354 UpsertContext(const Key& key, uint8_t val)
355 : key_{ key }
356 , val_{ val } {
357 }
358
359 /// Copy (and deep-copy) constructor.
360 UpsertContext(const UpsertContext& other)
361 : key_{ other.key_ }
362 , val_{ other.val_ } {
363 }
364
365 /// The implicit and explicit interfaces require a key() accessor.
366 inline const Key& key() const {
367 return key_;
368 }
369 inline static constexpr uint32_t value_size() {
370 return sizeof(value_t);
371 }
372 /// Non-atomic and atomic Put() methods.
373 inline void Put(Value& value) {
374 value.gen_ = 0;
375 std::memset(value.value_, val_, val_);
376 value.length_ = val_;
377 }
378 inline bool PutAtomic(Value& value) {
379 // Get the lock on the value.
380 uint64_t expected_gen;
381 bool success;
382 do {
383 do {
384 // Spin until other the thread releases the lock.
385 expected_gen = value.gen_.load();
386 } while(expected_gen == UINT64_MAX);
387 // Try to get the lock.
388 success = value.gen_.compare_exchange_weak(expected_gen, UINT64_MAX);
389 } while(!success);
390
391 std::memset(value.value_, val_, val_);
392 value.length_ = val_;
393 // Increment the value's generation number.
394 value.gen_.store(expected_gen + 1);
395 return true;
396 }
397
398 protected:
399 /// The explicit interface requires a DeepCopy_Internal() implementation.
400 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
401 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
402 }
403
404 private:
405 Key key_;
406 uint8_t val_;
407 };
408
409 class ReadContext : public IAsyncContext {
410 public:
411 typedef Key key_t;
412 typedef Value value_t;
413
414 ReadContext(Key key, uint8_t expected)
415 : key_{ key }
416 , expected_{ expected } {
417 }
418
419 /// Copy (and deep-copy) constructor.
420 ReadContext(const ReadContext& other)
421 : key_{ other.key_ }
422 , expected_{ other.expected_ } {
423 }
424
425 /// The implicit and explicit interfaces require a key() accessor.
426 inline const Key& key() const {
427 return key_;
428 }
429
430 inline void Get(const Value& value) {
431 // This is a paging test, so we expect to read stuff from disk.
432 ASSERT_EQ(expected_, value.length_);
433 ASSERT_EQ(expected_, value.value_[expected_ - 5]);
434 }
435 inline void GetAtomic(const Value& value) {
436 uint64_t post_gen = value.gen_.load();
437 uint64_t pre_gen;
438 uint16_t len;
439 uint8_t val;
440 do {
441 // Pre- gen # for this read is last read's post- gen #.
442 pre_gen = post_gen;
443 len = value.length_;
444 val = value.value_[len - 5];
445 post_gen = value.gen_.load();
446 } while(pre_gen != post_gen);
447 ASSERT_EQ(expected_, val);
448 }
449
450 protected:
451 /// The explicit interface requires a DeepCopy_Internal() implementation.
452 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
453 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
454 }
455
456 private:
457 Key key_;
458 uint8_t expected_;
459 };
460
461 std::experimental::filesystem::create_directories("logs");
462
463 // 8 pages!
464 FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs", 0.5 };
465
466 static constexpr size_t kNumRecords = 250000;
467 static constexpr size_t kNumThreads = 2;
468
469 static std::atomic<uint64_t> num_writes{ 0 };
470
471 auto upsert_worker = [](FasterKv<Key, Value, disk_t>* store_,
472 size_t thread_idx, uint8_t val) {
473 Guid session_id = store_->StartSession();
474
475 for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) {
476 auto callback = [](IAsyncContext* ctxt, Status result) {
477 // In-memory test.
478 ASSERT_TRUE(false);
479 };
480
481 if(idx % 256 == 0) {
482 store_->Refresh();
483 }
484
485 uint64_t key_component = thread_idx * (kNumRecords / kNumThreads) + idx;
486 UpsertContext context{ Key{ key_component, key_component }, val };
487 Status result = store_->Upsert(context, callback, 1);
488 ASSERT_EQ(Status::Ok, result);
489 ++num_writes;
490 }
491
492 store_->StopSession();
493 };
494
495 // Insert.
496 std::deque<std::thread> threads{};
497 for(size_t idx = 0; idx < kNumThreads; ++idx) {
498 threads.emplace_back(upsert_worker, &store, idx, 25);
499 }
500 for(auto& thread : threads) {
501 thread.join();
502 }
503
504 ASSERT_EQ(kNumRecords, num_writes.load());
505
506 // Read.
507 Guid session_id = store.StartSession();
508
509 static std::atomic<uint64_t> records_read{ 0 };
510 for(size_t idx = 0; idx < kNumRecords; ++idx) {
511 auto callback = [](IAsyncContext* ctxt, Status result) {
512 CallbackContext<ReadContext> context{ ctxt };
513 ASSERT_EQ(Status::Ok, result);
514 ++records_read;
515 };
516
517 if(idx % 256 == 0) {
518 store.Refresh();
519 }
520
521 ReadContext context{ Key{ idx, idx }, 25 };
522 Status result = store.Read(context, callback, 1);
523 if(result == Status::Ok) {
524 ++records_read;
525 } else {
526 ASSERT_EQ(Status::Pending, result) << idx;
527 }
528 }
529
530 ASSERT_LT(records_read.load(), kNumRecords);
531 bool result = store.CompletePending(true);
532 ASSERT_TRUE(result);
533 ASSERT_EQ(kNumRecords, records_read.load());
534
535 //// Update.
536 num_writes = 0;
537 threads.clear();
538 for(size_t idx = 0; idx < kNumThreads; ++idx) {
539 threads.emplace_back(upsert_worker, &store, idx, 87);
540 }
541 for(auto& thread : threads) {
542 thread.join();
543 }
544
545 ASSERT_EQ(kNumRecords, num_writes.load());
546
547 // Delete some old copies of records (160 MB) that we no longer need.
548 static constexpr uint64_t kNewBeginAddress{ 167772160L };
549 static std::atomic<bool> truncated{ false };
550 static std::atomic<bool> complete{ false };
551 auto truncate_callback = [](uint64_t offset) {
552 ASSERT_LE(offset, kNewBeginAddress);
553 truncated = true;
554 };
555 auto complete_callback = []() {
556 complete = true;
557 };
558
559 result = store.ShiftBeginAddress(Address{ kNewBeginAddress }, truncate_callback, complete_callback);
560 ASSERT_TRUE(result);
561
562 while(!truncated || !complete) {
563 store.CompletePending(false);
564 }
565
566 // Read again.
567 records_read = 0;;
568 for(size_t idx = 0; idx < kNumRecords; ++idx) {
569 auto callback = [](IAsyncContext* ctxt, Status result) {
570 CallbackContext<ReadContext> context{ ctxt };
571 ASSERT_EQ(Status::Ok, result);
572 ++records_read;
573 };
574
575 if(idx % 256 == 0) {
576 store.Refresh();
577 }
578
579 ReadContext context{ Key{ idx, idx }, 87 };
580 Status result = store.Read(context, callback, 1);
581 if(result == Status::Ok) {
582 ++records_read;
583 } else {
584 ASSERT_EQ(Status::Pending, result);
585 }
586 }
587
588 ASSERT_LT(records_read.load(), kNumRecords);
589 result = store.CompletePending(true);
590 ASSERT_TRUE(result);
591 ASSERT_EQ(kNumRecords, records_read.load());
592
593 store.StopSession();
594}
595
596TEST(CLASS, Rmw) {
597 class Key {
598 public:
599 Key(uint64_t key)
600 : key_{ key } {
601 }
602
603 inline static constexpr uint32_t size() {
604 return static_cast<uint32_t>(sizeof(Key));
605 }
606 inline KeyHash GetHash() const {
607 return KeyHash{ Utility::GetHashCode(key_) };
608 }
609
610 /// Comparison operators.
611 inline bool operator==(const Key& other) const {
612 return key_ == other.key_;
613 }
614 inline bool operator!=(const Key& other) const {
615 return key_ != other.key_;
616 }
617
618 private:
619 uint64_t key_;
620 };
621
622 class RmwContext;
623
624 class Value {
625 public:
626 Value()
627 : counter_{ 0 }
628 , junk_{ 1 } {
629 }
630
631 inline static constexpr uint32_t size() {
632 return static_cast<uint32_t>(sizeof(Value));
633 }
634
635 friend class RmwContext;
636
637 private:
638 std::atomic<uint64_t> counter_;
639 uint8_t junk_[1016];
640 };
641 static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024");
642 static_assert(alignof(Value) == 8, "alignof(Value) != 8");
643
644 class RmwContext : public IAsyncContext {
645 public:
646 typedef Key key_t;
647 typedef Value value_t;
648
649 RmwContext(Key key, uint64_t incr)
650 : key_{ key }
651 , incr_{ incr }
652 , val_{ 0 } {
653 }
654
655 /// Copy (and deep-copy) constructor.
656 RmwContext(const RmwContext& other)
657 : key_{ other.key_ }
658 , incr_{ other.incr_ }
659 , val_{ other.val_ } {
660 }
661
662 inline const Key& key() const {
663 return key_;
664 }
665 inline static constexpr uint32_t value_size() {
666 return sizeof(value_t);
667 }
668 inline void RmwInitial(Value& value) {
669 value.counter_ = incr_;
670 val_ = value.counter_;
671 }
672 inline void RmwCopy(const Value& old_value, Value& value) {
673 value.counter_ = old_value.counter_ + incr_;
674 val_ = value.counter_;
675 }
676 inline bool RmwAtomic(Value& value) {
677 val_ = value.counter_.fetch_add(incr_) + incr_;
678 return true;
679 }
680
681 inline uint64_t val() const {
682 return val_;
683 }
684
685 protected:
686 /// The explicit interface requires a DeepCopy_Internal() implementation.
687 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
688 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
689 }
690
691 private:
692 Key key_;
693 uint64_t incr_;
694
695 uint64_t val_;
696 };
697
698 std::experimental::filesystem::create_directories("logs");
699
700 // 8 pages!
701 FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs", 0.5 };
702
703 Guid session_id = store.StartSession();
704
705 constexpr size_t kNumRecords = 200000;
706
707 // Initial RMW.
708 static std::atomic<uint64_t> records_touched{ 0 };
709 for(size_t idx = 0; idx < kNumRecords; ++idx) {
710 auto callback = [](IAsyncContext* ctxt, Status result) {
711 CallbackContext<RmwContext> context{ ctxt };
712 ASSERT_EQ(Status::Ok, result);
713 ASSERT_EQ(3, context->val());
714 ++records_touched;
715 };
716
717 if(idx % 256 == 0) {
718 store.Refresh();
719 }
720
721 RmwContext context{ Key{ idx }, 3 };
722 Status result = store.Rmw(context, callback, 1);
723 if(result == Status::Ok) {
724 ASSERT_EQ(3, context.val());
725 ++records_touched;
726 } else {
727 ASSERT_EQ(Status::Pending, result);
728 }
729 }
730
731 bool result = store.CompletePending(true);
732 ASSERT_TRUE(result);
733 ASSERT_EQ(kNumRecords, records_touched.load());
734
735 // Second RMW.
736 records_touched = 0;
737 for(size_t idx = kNumRecords; idx > 0; --idx) {
738 auto callback = [](IAsyncContext* ctxt, Status result) {
739 CallbackContext<RmwContext> context{ ctxt };
740 ASSERT_EQ(Status::Ok, result);
741 ASSERT_EQ(8, context->val());
742 ++records_touched;
743 };
744
745 if(idx % 256 == 0) {
746 store.Refresh();
747 }
748
749 RmwContext context{ Key{ idx - 1 }, 5 };
750 Status result = store.Rmw(context, callback, 1);
751 if(result == Status::Ok) {
752 ASSERT_EQ(8, context.val()) << idx - 1;
753 ++records_touched;
754 } else {
755 ASSERT_EQ(Status::Pending, result);
756 }
757 }
758
759 ASSERT_LT(records_touched.load(), kNumRecords);
760 result = store.CompletePending(true);
761 ASSERT_TRUE(result);
762 ASSERT_EQ(kNumRecords, records_touched.load());
763
764 store.StopSession();
765}
766
767TEST(CLASS, Rmw_Concurrent) {
768 class Key {
769 public:
770 Key(uint64_t key)
771 : key_{ key } {
772 }
773
774 inline static constexpr uint32_t size() {
775 return static_cast<uint32_t>(sizeof(Key));
776 }
777 inline KeyHash GetHash() const {
778 return KeyHash{ Utility::GetHashCode(key_) };
779 }
780
781 /// Comparison operators.
782 inline bool operator==(const Key& other) const {
783 return key_ == other.key_;
784 }
785 inline bool operator!=(const Key& other) const {
786 return key_ != other.key_;
787 }
788
789 private:
790 uint64_t key_;
791 };
792
793 class RmwContext;
794 class ReadContext;
795
796 class Value {
797 public:
798 Value()
799 : counter_{ 0 }
800 , junk_{ 1 } {
801 }
802
803 inline static constexpr uint32_t size() {
804 return static_cast<uint32_t>(sizeof(Value));
805 }
806
807 friend class RmwContext;
808 friend class ReadContext;
809
810 private:
811 std::atomic<uint64_t> counter_;
812 uint8_t junk_[1016];
813 };
814 static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024");
815 static_assert(alignof(Value) == 8, "alignof(Value) != 8");
816
817 class RmwContext : public IAsyncContext {
818 public:
819 typedef Key key_t;
820 typedef Value value_t;
821
822 RmwContext(Key key, uint64_t incr)
823 : key_{ key }
824 , incr_{ incr } {
825 }
826
827 /// Copy (and deep-copy) constructor.
828 RmwContext(const RmwContext& other)
829 : key_{ other.key_ }
830 , incr_{ other.incr_ } {
831 }
832
833 inline const Key& key() const {
834 return key_;
835 }
836 inline static constexpr uint32_t value_size() {
837 return sizeof(value_t);
838 }
839 inline void RmwInitial(Value& value) {
840 value.counter_ = incr_;
841 }
842 inline void RmwCopy(const Value& old_value, Value& value) {
843 value.counter_ = old_value.counter_ + incr_;
844 }
845 inline bool RmwAtomic(Value& value) {
846 value.counter_.fetch_add(incr_);
847 return true;
848 }
849
850 protected:
851 /// The explicit interface requires a DeepCopy_Internal() implementation.
852 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
853 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
854 }
855
856 private:
857 Key key_;
858 uint64_t incr_;
859 };
860
861 class ReadContext : public IAsyncContext {
862 public:
863 typedef Key key_t;
864 typedef Value value_t;
865
866 ReadContext(Key key)
867 : key_{ key } {
868 }
869
870 /// Copy (and deep-copy) constructor.
871 ReadContext(const ReadContext& other)
872 : key_{ other.key_ } {
873 }
874
875 /// The implicit and explicit interfaces require a key() accessor.
876 inline const Key& key() const {
877 return key_;
878 }
879
880 inline void Get(const Value& value) {
881 counter = value.counter_.load(std::memory_order_acquire);
882 }
883 inline void GetAtomic(const Value& value) {
884 counter = value.counter_.load();
885 }
886
887 protected:
888 /// The explicit interface requires a DeepCopy_Internal() implementation.
889 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
890 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
891 }
892
893 private:
894 Key key_;
895 public:
896 uint64_t counter;
897 };
898
899 static constexpr size_t kNumRecords = 150000;
900 static constexpr size_t kNumThreads = 2;
901
902 auto rmw_worker = [](FasterKv<Key, Value, disk_t>* store_, uint64_t incr) {
903 Guid session_id = store_->StartSession();
904 for(size_t idx = 0; idx < kNumRecords; ++idx) {
905 auto callback = [](IAsyncContext* ctxt, Status result) {
906 CallbackContext<RmwContext> context{ ctxt };
907 ASSERT_EQ(Status::Ok, result);
908 };
909
910 if(idx % 256 == 0) {
911 store_->Refresh();
912 }
913
914 RmwContext context{ Key{ idx }, incr };
915 Status result = store_->Rmw(context, callback, 1);
916 if(result != Status::Ok) {
917 ASSERT_EQ(Status::Pending, result);
918 }
919 }
920 bool result = store_->CompletePending(true);
921 ASSERT_TRUE(result);
922 store_->StopSession();
923 };
924
925 auto read_worker1 = [](FasterKv<Key, Value, disk_t>* store_, size_t thread_idx) {
926 Guid session_id = store_->StartSession();
927 for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) {
928 auto callback = [](IAsyncContext* ctxt, Status result) {
929 CallbackContext<ReadContext> context{ ctxt };
930 ASSERT_EQ(Status::Ok, result);
931 ASSERT_EQ(7 * kNumThreads, context->counter);
932 };
933
934 if(idx % 256 == 0) {
935 store_->Refresh();
936 }
937
938 ReadContext context{ Key{ thread_idx* (kNumRecords / kNumThreads) + idx } };
939 Status result = store_->Read(context, callback, 1);
940 if(result == Status::Ok) {
941 ASSERT_EQ(7 * kNumThreads, context.counter);
942 } else {
943 ASSERT_EQ(Status::Pending, result);
944 }
945 }
946 bool result = store_->CompletePending(true);
947 ASSERT_TRUE(result);
948 store_->StopSession();
949 };
950
951 auto read_worker2 = [](FasterKv<Key, Value, disk_t>* store_, size_t thread_idx) {
952 Guid session_id = store_->StartSession();
953 for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) {
954 auto callback = [](IAsyncContext* ctxt, Status result) {
955 CallbackContext<ReadContext> context{ ctxt };
956 ASSERT_EQ(Status::Ok, result);
957 ASSERT_EQ(13 * kNumThreads, context->counter);
958 };
959
960 if(idx % 256 == 0) {
961 store_->Refresh();
962 }
963
964 ReadContext context{ Key{ thread_idx* (kNumRecords / kNumThreads) + idx } };
965 Status result = store_->Read(context, callback, 1);
966 if(result == Status::Ok) {
967 ASSERT_EQ(13 * kNumThreads, context.counter);
968 } else {
969 ASSERT_EQ(Status::Pending, result);
970 }
971 }
972 bool result = store_->CompletePending(true);
973 ASSERT_TRUE(result);
974 store_->StopSession();
975 };
976
977 std::experimental::filesystem::create_directories("logs");
978
979 // 8 pages!
980 FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs", 0.5 };
981
982 // Initial RMW.
983 std::deque<std::thread> threads{};
984 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
985 threads.emplace_back(rmw_worker, &store, 7);
986 }
987 for(auto& thread : threads) {
988 thread.join();
989 }
990
991 // Read.
992 threads.clear();
993 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
994 threads.emplace_back(read_worker1, &store, idx);
995 }
996 for(auto& thread : threads) {
997 thread.join();
998 }
999
1000 // Second RMW.
1001 threads.clear();
1002 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1003 threads.emplace_back(rmw_worker, &store, 6);
1004 }
1005 for(auto& thread : threads) {
1006 thread.join();
1007 }
1008
1009 // Read again.
1010 threads.clear();
1011 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1012 threads.emplace_back(read_worker2, &store, idx);
1013 }
1014 for(auto& thread : threads) {
1015 thread.join();
1016 }
1017}
1018