1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#include <cstdint>
5#include <cstring>
6#include <deque>
7#include <functional>
8#include <thread>
9#include "gtest/gtest.h"
10
11#include "core/faster.h"
12#include "device/null_disk.h"
13
14using namespace FASTER::core;
15TEST(InMemFaster, UpsertRead) {
16 class alignas(2) Key {
17 public:
18 Key(uint8_t key)
19 : key_{ key } {
20 }
21
22 inline static constexpr uint32_t size() {
23 return static_cast<uint32_t>(sizeof(Key));
24 }
25 inline KeyHash GetHash() const {
26 std::hash<uint8_t> hash_fn;
27 return KeyHash{ hash_fn(key_) };
28 }
29
30 /// Comparison operators.
31 inline bool operator==(const Key& other) const {
32 return key_ == other.key_;
33 }
34 inline bool operator!=(const Key& other) const {
35 return key_ != other.key_;
36 }
37
38 private:
39 uint8_t key_;
40 };
41
42 class UpsertContext;
43 class ReadContext;
44
45 class Value {
46 public:
47 Value()
48 : value_{ 0 } {
49 }
50 Value(const Value& other)
51 : value_{ other.value_ } {
52 }
53 Value(uint8_t value)
54 : value_{ value } {
55 }
56
57 inline static constexpr uint32_t size() {
58 return static_cast<uint32_t>(sizeof(Value));
59 }
60
61 friend class UpsertContext;
62 friend class ReadContext;
63
64 private:
65 union {
66 uint8_t value_;
67 std::atomic<uint8_t> atomic_value_;
68 };
69 };
70
71 class UpsertContext : public IAsyncContext {
72 public:
73 typedef Key key_t;
74 typedef Value value_t;
75
76 UpsertContext(uint8_t key)
77 : key_{ key } {
78 }
79
80 /// Copy (and deep-copy) constructor.
81 UpsertContext(const UpsertContext& other)
82 : key_{ other.key_ } {
83 }
84
85 /// The implicit and explicit interfaces require a key() accessor.
86 inline const Key& key() const {
87 return key_;
88 }
89 inline static constexpr uint32_t value_size() {
90 return sizeof(value_t);
91 }
92 /// Non-atomic and atomic Put() methods.
93 inline void Put(Value& value) {
94 value.value_ = 23;
95 }
96 inline bool PutAtomic(Value& value) {
97 value.atomic_value_.store(42);
98 return true;
99 }
100
101 protected:
102 /// The explicit interface requires a DeepCopy_Internal() implementation.
103 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
104 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
105 }
106
107 private:
108 Key key_;
109 };
110
111 class ReadContext : public IAsyncContext {
112 public:
113 typedef Key key_t;
114 typedef Value value_t;
115
116 ReadContext(uint8_t key)
117 : key_{ key } {
118 }
119
120 /// Copy (and deep-copy) constructor.
121 ReadContext(const ReadContext& other)
122 : key_{ other.key_ } {
123 }
124
125 /// The implicit and explicit interfaces require a key() accessor.
126 inline const Key& key() const {
127 return key_;
128 }
129
130 inline void Get(const Value& value) {
131 // All reads should be atomic (from the mutable tail).
132 ASSERT_TRUE(false);
133 }
134 inline void GetAtomic(const Value& value) {
135 output = value.atomic_value_.load();
136 }
137
138 protected:
139 /// The explicit interface requires a DeepCopy_Internal() implementation.
140 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
141 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
142 }
143
144 private:
145 Key key_;
146 public:
147 uint8_t output;
148 };
149
150 FasterKv<Key, Value, FASTER::device::NullDisk> store { 128, 1073741824, "" };
151
152 store.StartSession();
153
154 // Insert.
155 for(size_t idx = 0; idx < 256; ++idx) {
156 auto callback = [](IAsyncContext* ctxt, Status result) {
157 // In-memory test.
158 ASSERT_TRUE(false);
159 };
160 UpsertContext context{ static_cast<uint8_t>(idx) };
161 Status result = store.Upsert(context, callback, 1);
162 ASSERT_EQ(Status::Ok, result);
163 }
164 // Read.
165 for(size_t idx = 0; idx < 256; ++idx) {
166 auto callback = [](IAsyncContext* ctxt, Status result) {
167 // In-memory test.
168 ASSERT_TRUE(false);
169 };
170 ReadContext context{ static_cast<uint8_t>(idx) };
171 Status result = store.Read(context, callback, 1);
172 ASSERT_EQ(Status::Ok, result);
173 // All upserts should have inserts (non-atomic).
174 ASSERT_EQ(23, context.output);
175 }
176 // Update.
177 for(size_t idx = 0; idx < 256; ++idx) {
178 auto callback = [](IAsyncContext* ctxt, Status result) {
179 // In-memory test.
180 ASSERT_TRUE(false);
181 };
182 UpsertContext context{ static_cast<uint8_t>(idx) };
183 Status result = store.Upsert(context, callback, 1);
184 ASSERT_EQ(Status::Ok, result);
185 }
186 // Read again.
187 for(size_t idx = 0; idx < 256; ++idx) {
188 auto callback = [](IAsyncContext* ctxt, Status result) {
189 // In-memory test.
190 ASSERT_TRUE(false);
191 };
192 ReadContext context{ static_cast<uint8_t>(idx) };
193 Status result = store.Read(context, callback, 1);
194 ASSERT_EQ(Status::Ok, result);
195 // All upserts should have updates (atomic).
196 ASSERT_EQ(42, context.output);
197 }
198
199 store.StopSession();
200}
201
202/// The hash always returns "0," so the FASTER store devolves into a linked list.
203TEST(InMemFaster, UpsertRead_DummyHash) {
204 class UpsertContext;
205 class ReadContext;
206
207 class Key {
208 public:
209 Key(uint16_t key)
210 : key_{ key } {
211 }
212
213 inline static constexpr uint32_t size() {
214 return static_cast<uint32_t>(sizeof(Key));
215 }
216 inline KeyHash GetHash() const {
217 return KeyHash{ 42 };
218 }
219
220 /// Comparison operators.
221 inline bool operator==(const Key& other) const {
222 return key_ == other.key_;
223 }
224 inline bool operator!=(const Key& other) const {
225 return key_ != other.key_;
226 }
227
228 friend class UpsertContext;
229 friend class ReadContext;
230
231 private:
232 uint16_t key_;
233 };
234
235 class Value {
236 public:
237 Value()
238 : value_{ 0 } {
239 }
240 Value(const Value& other)
241 : value_{ other.value_ } {
242 }
243 Value(uint16_t value)
244 : value_{ value } {
245 }
246
247 inline static constexpr uint32_t size() {
248 return static_cast<uint32_t>(sizeof(Value));
249 }
250
251 friend class UpsertContext;
252 friend class ReadContext;
253
254 private:
255 union {
256 uint16_t value_;
257 std::atomic<uint16_t> atomic_value_;
258 };
259 };
260
261 class UpsertContext : public IAsyncContext {
262 public:
263 typedef Key key_t;
264 typedef Value value_t;
265
266 UpsertContext(uint16_t key)
267 : key_{ key } {
268 }
269
270 /// Copy (and deep-copy) constructor.
271 UpsertContext(const UpsertContext& other)
272 : key_{ other.key_ } {
273 }
274
275 /// The implicit and explicit interfaces require a key() accessor.
276 inline const Key& key() const {
277 return key_;
278 }
279 inline static constexpr uint32_t value_size() {
280 return sizeof(value_t);
281 }
282 /// Non-atomic and atomic Put() methods.
283 inline void Put(Value& value) {
284 value.value_ = key_.key_;
285 }
286 inline bool PutAtomic(Value& value) {
287 value.atomic_value_.store(key_.key_);
288 return true;
289 }
290
291 protected:
292 /// The explicit interface requires a DeepCopy_Internal() implementation.
293 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
294 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
295 }
296
297 private:
298 Key key_;
299 };
300
301 class ReadContext : public IAsyncContext {
302 public:
303 typedef Key key_t;
304 typedef Value value_t;
305
306 ReadContext(uint16_t key)
307 : key_{ key } {
308 }
309
310 /// Copy (and deep-copy) constructor.
311 ReadContext(const ReadContext& other)
312 : key_{ other.key_ } {
313 }
314
315 /// The implicit and explicit interfaces require a key() accessor.
316 inline const Key& key() const {
317 return key_;
318 }
319
320 inline void Get(const Value& value) {
321 // All reads should be atomic (from the mutable tail).
322 ASSERT_TRUE(false);
323 }
324 inline void GetAtomic(const Value& value) {
325 output = value.atomic_value_.load();
326 }
327
328 protected:
329 /// The explicit interface requires a DeepCopy_Internal() implementation.
330 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
331 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
332 }
333
334 private:
335 Key key_;
336 public:
337 uint16_t output;
338 };
339
340 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 128, 1073741824, "" };
341
342 store.StartSession();
343
344 // Insert.
345 for(uint16_t idx = 0; idx < 10000; ++idx) {
346 auto callback = [](IAsyncContext* ctxt, Status result) {
347 // In-memory test.
348 ASSERT_TRUE(false);
349 };
350 UpsertContext context{ idx };
351 Status result = store.Upsert(context, callback, 1);
352 ASSERT_EQ(Status::Ok, result);
353 }
354 // Read.
355 for(uint16_t idx = 0; idx < 10000; ++idx) {
356 auto callback = [](IAsyncContext* ctxt, Status result) {
357 // In-memory test.
358 ASSERT_TRUE(false);
359 };
360 ReadContext context{ idx };
361 Status result = store.Read(context, callback, 1);
362 ASSERT_EQ(Status::Ok, result);
363 // All upserts should have inserts (non-atomic).
364 ASSERT_EQ(idx, context.output);
365 }
366
367 store.StopSession();
368}
369
370TEST(InMemFaster, UpsertRead_Concurrent) {
371 class Key {
372 public:
373 Key(uint32_t key)
374 : key_{ key } {
375 }
376
377 inline static constexpr uint32_t size() {
378 return static_cast<uint32_t>(sizeof(Key));
379 }
380 inline KeyHash GetHash() const {
381 std::hash<uint32_t> hash_fn;
382 return KeyHash{ hash_fn(key_) };
383 }
384
385 /// Comparison operators.
386 inline bool operator==(const Key& other) const {
387 return key_ == other.key_;
388 }
389 inline bool operator!=(const Key& other) const {
390 return key_ != other.key_;
391 }
392
393 private:
394 uint32_t key_;
395 };
396
397 class UpsertContext;
398 class ReadContext;
399
400 class alignas(16) Value {
401 public:
402 Value()
403 : length_{ 0 }
404 , value_{ 0 } {
405 }
406
407 inline static constexpr uint32_t size() {
408 return static_cast<uint32_t>(sizeof(Value));
409 }
410
411 friend class UpsertContext;
412 friend class ReadContext;
413
414 private:
415 uint8_t value_[31];
416 std::atomic<uint8_t> length_;
417 };
418
419 class UpsertContext : public IAsyncContext {
420 public:
421 typedef Key key_t;
422 typedef Value value_t;
423
424 UpsertContext(uint32_t key)
425 : key_{ key } {
426 }
427
428 /// Copy (and deep-copy) constructor.
429 UpsertContext(const UpsertContext& other)
430 : key_{ other.key_ } {
431 }
432
433 /// The implicit and explicit interfaces require a key() accessor.
434 inline const Key& key() const {
435 return key_;
436 }
437 inline static constexpr uint32_t value_size() {
438 return sizeof(value_t);
439 }
440 /// Non-atomic and atomic Put() methods.
441 inline void Put(Value& value) {
442 value.length_ = 5;
443 std::memset(value.value_, 23, 5);
444 }
445 inline bool PutAtomic(Value& value) {
446 // Get the lock on the value.
447 bool success;
448 do {
449 uint8_t expected_length;
450 do {
451 // Spin until other the thread releases the lock.
452 expected_length = value.length_.load();
453 } while(expected_length == UINT8_MAX);
454 // Try to get the lock.
455 success = value.length_.compare_exchange_weak(expected_length, UINT8_MAX);
456 } while(!success);
457
458 std::memset(value.value_, 42, 7);
459 value.length_.store(7);
460 return true;
461 }
462
463 protected:
464 /// The explicit interface requires a DeepCopy_Internal() implementation.
465 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
466 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
467 }
468
469 private:
470 Key key_;
471 };
472
473 class ReadContext : public IAsyncContext {
474 public:
475 typedef Key key_t;
476 typedef Value value_t;
477
478 ReadContext(uint32_t key)
479 : key_{ key } {
480 }
481
482 /// Copy (and deep-copy) constructor.
483 ReadContext(const ReadContext& other)
484 : key_{ other.key_ } {
485 }
486
487 /// The implicit and explicit interfaces require a key() accessor.
488 inline const Key& key() const {
489 return key_;
490 }
491
492 inline void Get(const Value& value) {
493 // All reads should be atomic (from the mutable tail).
494 ASSERT_TRUE(false);
495 }
496 inline void GetAtomic(const Value& value) {
497 do {
498 output_length = value.length_.load();
499 ASSERT_EQ(0, reinterpret_cast<size_t>(value.value_) % 16);
500 output_pt1 = *reinterpret_cast<const uint64_t*>(value.value_);
501 output_pt2 = *reinterpret_cast<const uint64_t*>(value.value_ + 8);
502 } while(output_length != value.length_.load());
503 }
504
505 protected:
506 /// The explicit interface requires a DeepCopy_Internal() implementation.
507 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
508 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
509 }
510
511 private:
512 Key key_;
513 public:
514 uint8_t output_length;
515 uint64_t output_pt1;
516 uint64_t output_pt2;
517 };
518
519 static constexpr size_t kNumOps = 1024;
520 static constexpr size_t kNumThreads = 2;
521
522 auto upsert_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
523 size_t thread_idx) {
524 store_->StartSession();
525
526 for(size_t idx = 0; idx < kNumOps; ++idx) {
527 auto callback = [](IAsyncContext* ctxt, Status result) {
528 // In-memory test.
529 ASSERT_TRUE(false);
530 };
531 UpsertContext context{ static_cast<uint32_t>((thread_idx * kNumOps) + idx) };
532 Status result = store_->Upsert(context, callback, 1);
533 ASSERT_EQ(Status::Ok, result);
534 }
535
536 store_->StopSession();
537 };
538
539 auto read_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
540 size_t thread_idx, uint64_t expected_value) {
541 store_->StartSession();
542
543 for(size_t idx = 0; idx < kNumOps; ++idx) {
544 auto callback = [](IAsyncContext* ctxt, Status result) {
545 // In-memory test.
546 ASSERT_TRUE(false);
547 };
548 ReadContext context{ static_cast<uint32_t>((thread_idx * kNumOps) + idx) };
549 Status result = store_->Read(context, callback, 1);
550 ASSERT_EQ(Status::Ok, result);
551 ASSERT_EQ(expected_value, context.output_pt1);
552 }
553
554 store_->StopSession();
555 };
556
557 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 128, 1073741824, "" };
558
559 // Insert.
560 std::deque<std::thread> threads{};
561 for(size_t idx = 0; idx < kNumThreads; ++idx) {
562 threads.emplace_back(upsert_worker, &store, idx);
563 }
564 for(auto& thread : threads) {
565 thread.join();
566 }
567
568 // Read.
569 threads.clear();
570 for(size_t idx = 0; idx < kNumThreads; ++idx) {
571 threads.emplace_back(read_worker, &store, idx, 0x1717171717);
572 }
573 for(auto& thread : threads) {
574 thread.join();
575 }
576
577 // Update.
578 threads.clear();
579 for(size_t idx = 0; idx < kNumThreads; ++idx) {
580 threads.emplace_back(upsert_worker, &store, idx);
581 }
582 for(auto& thread : threads) {
583 thread.join();
584 }
585
586 // Read again.
587 threads.clear();
588 for(size_t idx = 0; idx < kNumThreads; ++idx) {
589 threads.emplace_back(read_worker, &store, idx, 0x2a2a2a2a2a2a2a);
590 }
591 for(auto& thread : threads) {
592 thread.join();
593 }
594}
595
596TEST(InMemFaster, UpsertRead_ResizeValue_Concurrent) {
597 class Key {
598 public:
599 Key(uint32_t key)
600 : key_{ key } {
601 }
602
603 inline static constexpr uint32_t size() {
604 return static_cast<uint32_t>(sizeof(Key));
605 }
606 inline KeyHash GetHash() const {
607 std::hash<uint32_t> hash_fn;
608 return KeyHash{ hash_fn(key_) };
609 }
610
611 /// Comparison operators.
612 inline bool operator==(const Key& other) const {
613 return key_ == other.key_;
614 }
615 inline bool operator!=(const Key& other) const {
616 return key_ != other.key_;
617 }
618
619 private:
620 uint32_t key_;
621 };
622
623 class UpsertContext;
624 class ReadContext;
625
626 class GenLock {
627 public:
628 GenLock()
629 : control_{ 0 } {
630 }
631 GenLock(uint64_t control)
632 : control_{ control } {
633 }
634 inline GenLock& operator=(const GenLock& other) {
635 control_ = other.control_;
636 return *this;
637 }
638
639 union {
640 struct {
641 uint64_t gen_number : 62;
642 uint64_t locked : 1;
643 uint64_t replaced : 1;
644 };
645 uint64_t control_;
646 };
647 };
648 static_assert(sizeof(GenLock) == 8, "sizeof(GenLock) != 8");
649
650 class AtomicGenLock {
651 public:
652 AtomicGenLock()
653 : control_{ 0 } {
654 }
655 AtomicGenLock(uint64_t control)
656 : control_{ control } {
657 }
658
659 inline GenLock load() const {
660 return GenLock{ control_.load() };
661 }
662 inline void store(GenLock desired) {
663 control_.store(desired.control_);
664 }
665
666 inline bool try_lock(bool& replaced) {
667 replaced = false;
668 GenLock expected{ control_.load() };
669 expected.locked = 0;
670 expected.replaced = 0;
671 GenLock desired{ expected.control_ };
672 desired.locked = 1;
673
674 if(control_.compare_exchange_strong(expected.control_, desired.control_)) {
675 return true;
676 }
677 if(expected.replaced) {
678 replaced = true;
679 }
680 return false;
681 }
682 inline void unlock(bool replaced) {
683 if(replaced) {
684 // Just turn off "locked" bit and increase gen number.
685 uint64_t sub_delta = ((uint64_t)1 << 62) - 1;
686 control_.fetch_sub(sub_delta);
687 } else {
688 // Turn off "locked" bit, turn on "replaced" bit, and increase gen number
689 uint64_t add_delta = ((uint64_t)1 << 63) - ((uint64_t)1 << 62) + 1;
690 control_.fetch_add(add_delta);
691 }
692 }
693
694 private:
695 std::atomic<uint64_t> control_;
696 };
697 static_assert(sizeof(AtomicGenLock) == 8, "sizeof(AtomicGenLock) != 8");
698
699 class Value {
700 public:
701 Value()
702 : gen_lock_{ 0 }
703 , size_{ 0 }
704 , length_{ 0 } {
705 }
706
707 inline uint32_t size() const {
708 return size_;
709 }
710
711 friend class UpsertContext;
712 friend class ReadContext;
713
714 private:
715 AtomicGenLock gen_lock_;
716 uint32_t size_;
717 uint32_t length_;
718
719 inline const uint8_t* buffer() const {
720 return reinterpret_cast<const uint8_t*>(this + 1);
721 }
722 inline uint8_t* buffer() {
723 return reinterpret_cast<uint8_t*>(this + 1);
724 }
725 };
726
727 class UpsertContext : public IAsyncContext {
728 public:
729 typedef Key key_t;
730 typedef Value value_t;
731
732 UpsertContext(uint32_t key, uint32_t length)
733 : key_{ key }
734 , length_{ length } {
735 }
736
737 /// Copy (and deep-copy) constructor.
738 UpsertContext(const UpsertContext& other)
739 : key_{ other.key_ }
740 , length_{ other.length_ } {
741 }
742
743 /// The implicit and explicit interfaces require a key() accessor.
744 inline const Key& key() const {
745 return key_;
746 }
747 inline uint32_t value_size() const {
748 return sizeof(Value) + length_;
749 }
750 /// Non-atomic and atomic Put() methods.
751 inline void Put(Value& value) {
752 value.gen_lock_.store(0);
753 value.size_ = sizeof(Value) + length_;
754 value.length_ = length_;
755 std::memset(value.buffer(), 88, length_);
756 }
757 inline bool PutAtomic(Value& value) {
758 bool replaced;
759 while(!value.gen_lock_.try_lock(replaced) && !replaced) {
760 std::this_thread::yield();
761 }
762 if(replaced) {
763 // Some other thread replaced this record.
764 return false;
765 }
766 if(value.size_ < sizeof(Value) + length_) {
767 // Current value is too small for in-place update.
768 value.gen_lock_.unlock(true);
769 return false;
770 }
771 // In-place update overwrites length and buffer, but not size.
772 value.length_ = length_;
773 std::memset(value.buffer(), 88, length_);
774 value.gen_lock_.unlock(false);
775 return true;
776 }
777
778 protected:
779 /// The explicit interface requires a DeepCopy_Internal() implementation.
780 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
781 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
782 }
783
784 private:
785 Key key_;
786 uint32_t length_;
787 };
788
789 class ReadContext : public IAsyncContext {
790 public:
791 typedef Key key_t;
792 typedef Value value_t;
793
794 ReadContext(uint32_t key)
795 : key_{ key }
796 , output_length{ 0 } {
797 }
798
799 /// Copy (and deep-copy) constructor.
800 ReadContext(const ReadContext& other)
801 : key_{ other.key_ }
802 , output_length{ 0 } {
803 }
804
805 /// The implicit and explicit interfaces require a key() accessor.
806 inline const Key& key() const {
807 return key_;
808 }
809
810 inline void Get(const Value& value) {
811 // All reads should be atomic (from the mutable tail).
812 ASSERT_TRUE(false);
813 }
814 inline void GetAtomic(const Value& value) {
815 GenLock before, after;
816 do {
817 before = value.gen_lock_.load();
818 output_length = value.length_;
819 output_bytes[0] = value.buffer()[0];
820 output_bytes[1] = value.buffer()[value.length_ - 1];
821 after = value.gen_lock_.load();
822 } while(before.gen_number != after.gen_number);
823 }
824
825 protected:
826 /// The explicit interface requires a DeepCopy_Internal() implementation.
827 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
828 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
829 }
830
831 private:
832 Key key_;
833 public:
834 uint8_t output_length;
835 // Extract two bytes of output.
836 uint8_t output_bytes[2];
837 };
838
839 static constexpr size_t kNumOps = 1024;
840 static constexpr size_t kNumThreads = 2;
841
842 auto upsert_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
843 size_t thread_idx, uint32_t value_length) {
844 store_->StartSession();
845
846 for(size_t idx = 0; idx < kNumOps; ++idx) {
847 auto callback = [](IAsyncContext* ctxt, Status result) {
848 // In-memory test.
849 ASSERT_TRUE(false);
850 };
851 UpsertContext context{ static_cast<uint32_t>((thread_idx * kNumOps) + idx), value_length };
852 Status result = store_->Upsert(context, callback, 1);
853 ASSERT_EQ(Status::Ok, result);
854 }
855
856 store_->StopSession();
857 };
858
859 auto read_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
860 size_t thread_idx, uint8_t expected_value) {
861 store_->StartSession();
862
863 for(size_t idx = 0; idx < kNumOps; ++idx) {
864 auto callback = [](IAsyncContext* ctxt, Status result) {
865 // In-memory test.
866 ASSERT_TRUE(false);
867 };
868 ReadContext context{ static_cast<uint32_t>((thread_idx * kNumOps) + idx) };
869 Status result = store_->Read(context, callback, 1);
870 ASSERT_EQ(Status::Ok, result);
871 ASSERT_EQ(expected_value, context.output_bytes[0]);
872 ASSERT_EQ(expected_value, context.output_bytes[1]);
873 }
874
875 store_->StopSession();
876 };
877
878 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 128, 1073741824, "" };
879
880 // Insert.
881 std::deque<std::thread> threads{};
882 for(size_t idx = 0; idx < kNumThreads; ++idx) {
883 threads.emplace_back(upsert_worker, &store, idx, 7);
884 }
885 for(auto& thread : threads) {
886 thread.join();
887 }
888
889 // Read.
890 threads.clear();
891 for(size_t idx = 0; idx < kNumThreads; ++idx) {
892 threads.emplace_back(read_worker, &store, idx, 88);
893 }
894 for(auto& thread : threads) {
895 thread.join();
896 }
897
898 // Update.
899 threads.clear();
900 for(size_t idx = 0; idx < kNumThreads; ++idx) {
901 threads.emplace_back(upsert_worker, &store, idx, 11);
902 }
903 for(auto& thread : threads) {
904 thread.join();
905 }
906
907 // Read again.
908 threads.clear();
909 for(size_t idx = 0; idx < kNumThreads; ++idx) {
910 threads.emplace_back(read_worker, &store, idx, 88);
911 }
912 for(auto& thread : threads) {
913 thread.join();
914 }
915}
916TEST(InMemFaster, Rmw) {
917 class Key {
918 public:
919 Key(uint64_t key)
920 : key_{ key } {
921 }
922
923 inline static constexpr uint32_t size() {
924 return static_cast<uint32_t>(sizeof(Key));
925 }
926 inline KeyHash GetHash() const {
927 std::hash<uint64_t> hash_fn;
928 return KeyHash{ hash_fn(key_) };
929 }
930
931 /// Comparison operators.
932 inline bool operator==(const Key& other) const {
933 return key_ == other.key_;
934 }
935 inline bool operator!=(const Key& other) const {
936 return key_ != other.key_;
937 }
938
939 private:
940 uint64_t key_;
941 };
942
943 class RmwContext;
944 class ReadContext;
945
946 class Value {
947 public:
948 Value()
949 : value_{ 0 } {
950 }
951 Value(const Value& other)
952 : value_{ other.value_ } {
953 }
954
955 inline static constexpr uint32_t size() {
956 return static_cast<uint32_t>(sizeof(Value));
957 }
958
959 friend class RmwContext;
960 friend class ReadContext;
961
962 private:
963 union {
964 int32_t value_;
965 std::atomic<int32_t> atomic_value_;
966 };
967 };
968
969 class RmwContext : public IAsyncContext {
970 public:
971 typedef Key key_t;
972 typedef Value value_t;
973
974 RmwContext(uint64_t key, int32_t incr)
975 : key_{ key }
976 , incr_{ incr } {
977 }
978
979 /// Copy (and deep-copy) constructor.
980 RmwContext(const RmwContext& other)
981 : key_{ other.key_ }
982 , incr_{ other.incr_ } {
983 }
984
985 /// The implicit and explicit interfaces require a key() accessor.
986 inline const Key& key() const {
987 return key_;
988 }
989 inline static constexpr uint32_t value_size() {
990 return sizeof(value_t);
991 }
992 inline void RmwInitial(Value& value) {
993 value.value_ = incr_;
994 }
995 inline void RmwCopy(const Value& old_value, Value& value) {
996 value.value_ = old_value.value_ + incr_;
997 }
998 inline bool RmwAtomic(Value& value) {
999 value.atomic_value_.fetch_add(incr_);
1000 return true;
1001 }
1002
1003 protected:
1004 /// The explicit interface requires a DeepCopy_Internal() implementation.
1005 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1006 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1007 }
1008
1009 private:
1010 int32_t incr_;
1011 Key key_;
1012 };
1013
1014 class ReadContext : public IAsyncContext {
1015 public:
1016 typedef Key key_t;
1017 typedef Value value_t;
1018
1019 ReadContext(uint64_t key)
1020 : key_{ key } {
1021 }
1022
1023 /// Copy (and deep-copy) constructor.
1024 ReadContext(const ReadContext& other)
1025 : key_{ other.key_ } {
1026 }
1027
1028 /// The implicit and explicit interfaces require a key() accessor.
1029 inline const Key& key() const {
1030 return key_;
1031 }
1032
1033 inline void Get(const Value& value) {
1034 // All reads should be atomic (from the mutable tail).
1035 ASSERT_TRUE(false);
1036 }
1037 inline void GetAtomic(const Value& value) {
1038 output = value.atomic_value_.load();
1039 }
1040
1041 protected:
1042 /// The explicit interface requires a DeepCopy_Internal() implementation.
1043 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1044 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1045 }
1046
1047 private:
1048 Key key_;
1049 public:
1050 int32_t output;
1051 };
1052
1053 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };
1054
1055 store.StartSession();
1056
1057 // Rmw, increment by 1.
1058 for(size_t idx = 0; idx < 2048; ++idx) {
1059 auto callback = [](IAsyncContext* ctxt, Status result) {
1060 // In-memory test.
1061 ASSERT_TRUE(false);
1062 };
1063 RmwContext context{ idx % 512, 1 };
1064 Status result = store.Rmw(context, callback, 1);
1065 ASSERT_EQ(Status::Ok, result);
1066 }
1067 // Read.
1068 for(size_t idx = 0; idx < 512; ++idx) {
1069 auto callback = [](IAsyncContext* ctxt, Status result) {
1070 // In-memory test.
1071 ASSERT_TRUE(false);
1072 };
1073 ReadContext context{ idx };
1074 Status result = store.Read(context, callback, 1);
1075 ASSERT_EQ(Status::Ok, result) << idx;
1076 // Should have performed 4 RMWs.
1077 ASSERT_EQ(4, context.output);
1078 }
1079 // Rmw, decrement by 1.
1080 for(size_t idx = 0; idx < 2048; ++idx) {
1081 auto callback = [](IAsyncContext* ctxt, Status result) {
1082 // In-memory test.
1083 ASSERT_TRUE(false);
1084 };
1085 RmwContext context{ idx % 512, -1 };
1086 Status result = store.Rmw(context, callback, 1);
1087 ASSERT_EQ(Status::Ok, result);
1088 }
1089 // Read again.
1090 for(size_t idx = 0; idx < 512; ++idx) {
1091 auto callback = [](IAsyncContext* ctxt, Status result) {
1092 // In-memory test.
1093 ASSERT_TRUE(false);
1094 };
1095 ReadContext context{ static_cast<uint8_t>(idx) };
1096 Status result = store.Read(context, callback, 1);
1097 ASSERT_EQ(Status::Ok, result);
1098 // All upserts should have inserts (non-atomic).
1099 ASSERT_EQ(0, context.output);
1100 }
1101
1102 store.StopSession();
1103}
1104
1105TEST(InMemFaster, Rmw_Concurrent) {
1106 class Key {
1107 public:
1108 Key(uint64_t key)
1109 : key_{ key } {
1110 }
1111
1112 inline static constexpr uint32_t size() {
1113 return static_cast<uint32_t>(sizeof(Key));
1114 }
1115 inline KeyHash GetHash() const {
1116 std::hash<uint64_t> hash_fn;
1117 return KeyHash{ hash_fn(key_) };
1118 }
1119
1120 /// Comparison operators.
1121 inline bool operator==(const Key& other) const {
1122 return key_ == other.key_;
1123 }
1124 inline bool operator!=(const Key& other) const {
1125 return key_ != other.key_;
1126 }
1127
1128 private:
1129 uint64_t key_;
1130 };
1131
1132 class RmwContext;
1133 class ReadContext;
1134
1135 class Value {
1136 public:
1137 Value()
1138 : value_{ 0 } {
1139 }
1140 Value(const Value& other)
1141 : value_{ other.value_ } {
1142 }
1143
1144 inline static constexpr uint32_t size() {
1145 return static_cast<uint32_t>(sizeof(Value));
1146 }
1147
1148 friend class RmwContext;
1149 friend class ReadContext;
1150
1151 private:
1152 union {
1153 int64_t value_;
1154 std::atomic<int64_t> atomic_value_;
1155 };
1156 };
1157
1158 class RmwContext : public IAsyncContext {
1159 public:
1160 typedef Key key_t;
1161 typedef Value value_t;
1162
1163 RmwContext(uint64_t key, int64_t incr)
1164 : key_{ key }
1165 , incr_{ incr } {
1166 }
1167
1168 /// Copy (and deep-copy) constructor.
1169 RmwContext(const RmwContext& other)
1170 : key_{ other.key_ }
1171 , incr_{ other.incr_ } {
1172 }
1173
1174 /// The implicit and explicit interfaces require a key() accessor.
1175 inline const Key& key() const {
1176 return key_;
1177 }
1178 inline static constexpr uint32_t value_size() {
1179 return sizeof(value_t);
1180 }
1181
1182 inline void RmwInitial(Value& value) {
1183 value.value_ = incr_;
1184 }
1185 inline void RmwCopy(const Value& old_value, Value& value) {
1186 value.value_ = old_value.value_ + incr_;
1187 }
1188 inline bool RmwAtomic(Value& value) {
1189 value.atomic_value_.fetch_add(incr_);
1190 return true;
1191 }
1192
1193 protected:
1194 /// The explicit interface requires a DeepCopy_Internal() implementation.
1195 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1196 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1197 }
1198
1199 private:
1200 int64_t incr_;
1201 Key key_;
1202 };
1203
1204 class ReadContext : public IAsyncContext {
1205 public:
1206 typedef Key key_t;
1207 typedef Value value_t;
1208
1209 ReadContext(uint64_t key)
1210 : key_{ key } {
1211 }
1212
1213 /// Copy (and deep-copy) constructor.
1214 ReadContext(const ReadContext& other)
1215 : key_{ other.key_ } {
1216 }
1217
1218 /// The implicit and explicit interfaces require a key() accessor.
1219 inline const Key& key() const {
1220 return key_;
1221 }
1222
1223 inline void Get(const Value& value) {
1224 // All reads should be atomic (from the mutable tail).
1225 ASSERT_TRUE(false);
1226 }
1227 inline void GetAtomic(const Value& value) {
1228 output = value.atomic_value_.load();
1229 }
1230
1231 protected:
1232 /// The explicit interface requires a DeepCopy_Internal() implementation.
1233 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1234 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1235 }
1236
1237 private:
1238 Key key_;
1239 public:
1240 int64_t output;
1241 };
1242
1243 static constexpr size_t kNumThreads = 2;
1244 static constexpr size_t kNumRmws = 2048;
1245 static constexpr size_t kRange = 512;
1246
1247 auto rmw_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
1248 int64_t incr) {
1249 store_->StartSession();
1250
1251 for(size_t idx = 0; idx < kNumRmws; ++idx) {
1252 auto callback = [](IAsyncContext* ctxt, Status result) {
1253 // In-memory test.
1254 ASSERT_TRUE(false);
1255 };
1256 RmwContext context{ idx % kRange, incr };
1257 Status result = store_->Rmw(context, callback, 1);
1258 ASSERT_EQ(Status::Ok, result);
1259 }
1260
1261 store_->StopSession();
1262 };
1263
1264 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };
1265
1266 // Rmw, increment by 2 * idx.
1267 std::deque<std::thread> threads{};
1268 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1269 threads.emplace_back(rmw_worker, &store, 2 * idx);
1270 }
1271 for(auto& thread : threads) {
1272 thread.join();
1273 }
1274
1275 // Read.
1276 store.StartSession();
1277
1278 for(size_t idx = 0; idx < kRange; ++idx) {
1279 auto callback = [](IAsyncContext* ctxt, Status result) {
1280 // In-memory test.
1281 ASSERT_TRUE(false);
1282 };
1283 ReadContext context{ idx };
1284 Status result = store.Read(context, callback, 1);
1285 ASSERT_EQ(Status::Ok, result) << idx;
1286 // Should have performed 4 RMWs.
1287 ASSERT_EQ((kNumThreads * (kNumThreads - 1)) * (kNumRmws / kRange), context.output);
1288 }
1289
1290 store.StopSession();
1291
1292 // Rmw, decrement by idx.
1293 threads.clear();
1294 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1295 threads.emplace_back(rmw_worker, &store, -idx);
1296 }
1297 for(auto& thread : threads) {
1298 thread.join();
1299 }
1300
1301 // Read again.
1302 store.StartSession();
1303
1304 for(size_t idx = 0; idx < kRange; ++idx) {
1305 auto callback = [](IAsyncContext* ctxt, Status result) {
1306 // In-memory test.
1307 ASSERT_TRUE(false);
1308 };
1309 ReadContext context{ static_cast<uint8_t>(idx) };
1310 Status result = store.Read(context, callback, 1);
1311 ASSERT_EQ(Status::Ok, result);
1312 // All upserts should have inserts (non-atomic).
1313 ASSERT_EQ(((kNumThreads * (kNumThreads - 1)) / 2) * (kNumRmws / kRange), context.output);
1314 }
1315
1316 store.StopSession();
1317}
1318
1319TEST(InMemFaster, Rmw_ResizeValue_Concurrent) {
1320 class Key {
1321 public:
1322 Key(uint64_t key)
1323 : key_{ key } {
1324 }
1325
1326 inline static constexpr uint32_t size() {
1327 return static_cast<uint32_t>(sizeof(Key));
1328 }
1329 inline KeyHash GetHash() const {
1330 std::hash<uint64_t> hash_fn;
1331 return KeyHash{ hash_fn(key_) };
1332 }
1333
1334 /// Comparison operators.
1335 inline bool operator==(const Key& other) const {
1336 return key_ == other.key_;
1337 }
1338 inline bool operator!=(const Key& other) const {
1339 return key_ != other.key_;
1340 }
1341
1342 private:
1343 uint64_t key_;
1344 };
1345
1346 class RmwContext;
1347 class ReadContext;
1348
1349 class GenLock {
1350 public:
1351 GenLock()
1352 : control_{ 0 } {
1353 }
1354 GenLock(uint64_t control)
1355 : control_{ control } {
1356 }
1357 inline GenLock& operator=(const GenLock& other) {
1358 control_ = other.control_;
1359 return *this;
1360 }
1361
1362 union {
1363 struct {
1364 uint64_t gen_number : 62;
1365 uint64_t locked : 1;
1366 uint64_t replaced : 1;
1367 };
1368 uint64_t control_;
1369 };
1370 };
1371 static_assert(sizeof(GenLock) == 8, "sizeof(GenLock) != 8");
1372
1373 class AtomicGenLock {
1374 public:
1375 AtomicGenLock()
1376 : control_{ 0 } {
1377 }
1378 AtomicGenLock(uint64_t control)
1379 : control_{ control } {
1380 }
1381
1382 inline GenLock load() const {
1383 return GenLock{ control_.load() };
1384 }
1385 inline void store(GenLock desired) {
1386 control_.store(desired.control_);
1387 }
1388
1389 inline bool try_lock(bool& replaced) {
1390 replaced = false;
1391 GenLock expected{ control_.load() };
1392 expected.locked = 0;
1393 expected.replaced = 0;
1394 GenLock desired{ expected.control_ };
1395 desired.locked = 1;
1396
1397 if(control_.compare_exchange_strong(expected.control_, desired.control_)) {
1398 return true;
1399 }
1400 if(expected.replaced) {
1401 replaced = true;
1402 }
1403 return false;
1404 }
1405 inline void unlock(bool replaced) {
1406 if(replaced) {
1407 // Just turn off "locked" bit and increase gen number.
1408 uint64_t sub_delta = ((uint64_t)1 << 62) - 1;
1409 control_.fetch_sub(sub_delta);
1410 } else {
1411 // Turn off "locked" bit, turn on "replaced" bit, and increase gen number
1412 uint64_t add_delta = ((uint64_t)1 << 63) - ((uint64_t)1 << 62) + 1;
1413 control_.fetch_add(add_delta);
1414 }
1415 }
1416
1417 private:
1418 std::atomic<uint64_t> control_;
1419 };
1420 static_assert(sizeof(AtomicGenLock) == 8, "sizeof(AtomicGenLock) != 8");
1421
1422 class Value {
1423 public:
1424 Value()
1425 : gen_lock_{ 0 }
1426 , size_{ 0 }
1427 , length_{ 0 } {
1428 }
1429
1430 inline uint32_t size() const {
1431 return size_;
1432 }
1433
1434 friend class RmwContext;
1435 friend class ReadContext;
1436
1437 private:
1438 AtomicGenLock gen_lock_;
1439 uint32_t size_;
1440 uint32_t length_;
1441
1442 inline const int8_t* buffer() const {
1443 return reinterpret_cast<const int8_t*>(this + 1);
1444 }
1445 inline int8_t* buffer() {
1446 return reinterpret_cast<int8_t*>(this + 1);
1447 }
1448 };
1449
1450 class RmwContext : public IAsyncContext {
1451 public:
1452 typedef Key key_t;
1453 typedef Value value_t;
1454
1455 RmwContext(uint64_t key, int8_t incr, uint32_t length)
1456 : key_{ key }
1457 , incr_{ incr }
1458 , length_{ length } {
1459 }
1460
1461 /// Copy (and deep-copy) constructor.
1462 RmwContext(const RmwContext& other)
1463 : key_{ other.key_ }
1464 , incr_{ other.incr_ }
1465 , length_{ other.length_ } {
1466 }
1467
1468 /// The implicit and explicit interfaces require a key() accessor.
1469 inline const Key& key() const {
1470 return key_;
1471 }
1472 inline uint32_t value_size() const {
1473 return sizeof(value_t) + length_;
1474 }
1475
1476 inline void RmwInitial(Value& value) {
1477 value.gen_lock_.store(GenLock{});
1478 value.size_ = sizeof(Value) + length_;
1479 value.length_ = length_;
1480 std::memset(value.buffer(), incr_, length_);
1481 }
1482 inline void RmwCopy(const Value& old_value, Value& value) {
1483 value.gen_lock_.store(GenLock{});
1484 value.size_ = sizeof(Value) + length_;
1485 value.length_ = length_;
1486 std::memset(value.buffer(), incr_, length_);
1487 for(uint32_t idx = 0; idx < std::min(old_value.length_, length_); ++idx) {
1488 value.buffer()[idx] = old_value.buffer()[idx] + incr_;
1489 }
1490 }
1491 inline bool RmwAtomic(Value& value) {
1492 bool replaced;
1493 while(!value.gen_lock_.try_lock(replaced) && !replaced) {
1494 std::this_thread::yield();
1495 }
1496 if(replaced) {
1497 // Some other thread replaced this record.
1498 return false;
1499 }
1500 if(value.size_ < sizeof(Value) + length_) {
1501 // Current value is too small for in-place update.
1502 value.gen_lock_.unlock(true);
1503 return false;
1504 }
1505 // In-place update overwrites length and buffer, but not size.
1506 value.length_ = length_;
1507 for(uint32_t idx = 0; idx < length_; ++idx) {
1508 value.buffer()[idx] += incr_;
1509 }
1510 value.gen_lock_.unlock(false);
1511 return true;
1512 }
1513
1514 protected:
1515 /// The explicit interface requires a DeepCopy_Internal() implementation.
1516 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1517 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1518 }
1519
1520 private:
1521 int8_t incr_;
1522 uint32_t length_;
1523 Key key_;
1524 };
1525
1526 class ReadContext : public IAsyncContext {
1527 public:
1528 typedef Key key_t;
1529 typedef Value value_t;
1530
1531 ReadContext(uint64_t key)
1532 : key_{ key }
1533 , output_length{ 0 } {
1534 }
1535
1536 /// Copy (and deep-copy) constructor.
1537 ReadContext(const ReadContext& other)
1538 : key_{ other.key_ }
1539 , output_length{ 0 } {
1540 }
1541
1542 /// The implicit and explicit interfaces require a key() accessor.
1543 inline const Key& key() const {
1544 return key_;
1545 }
1546
1547 inline void Get(const Value& value) {
1548 // All reads should be atomic (from the mutable tail).
1549 ASSERT_TRUE(false);
1550 }
1551 inline void GetAtomic(const Value& value) {
1552 GenLock before, after;
1553 do {
1554 before = value.gen_lock_.load();
1555 output_length = value.length_;
1556 output_bytes[0] = value.buffer()[0];
1557 output_bytes[1] = value.buffer()[value.length_ - 1];
1558 after = value.gen_lock_.load();
1559 } while(before.gen_number != after.gen_number);
1560 }
1561
1562 protected:
1563 /// The explicit interface requires a DeepCopy_Internal() implementation.
1564 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1565 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1566 }
1567
1568 private:
1569 Key key_;
1570 public:
1571 uint8_t output_length;
1572 // Extract two bytes of output.
1573 int8_t output_bytes[2];
1574 };
1575
1576 static constexpr int8_t kNumThreads = 2;
1577 static constexpr size_t kNumRmws = 2048;
1578 static constexpr size_t kRange = 512;
1579
1580 auto rmw_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
1581 int8_t incr, uint32_t value_length) {
1582 store_->StartSession();
1583
1584 for(size_t idx = 0; idx < kNumRmws; ++idx) {
1585 auto callback = [](IAsyncContext* ctxt, Status result) {
1586 // In-memory test.
1587 ASSERT_TRUE(false);
1588 };
1589 RmwContext context{ idx % kRange, incr, value_length };
1590 Status result = store_->Rmw(context, callback, 1);
1591 ASSERT_EQ(Status::Ok, result);
1592 }
1593
1594 store_->StopSession();
1595 };
1596
1597 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };
1598
1599 // Rmw, increment by 3.
1600 std::deque<std::thread> threads{};
1601 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1602 threads.emplace_back(rmw_worker, &store, 3, 5);
1603 }
1604 for(auto& thread : threads) {
1605 thread.join();
1606 }
1607
1608 // Read.
1609 store.StartSession();
1610
1611 for(size_t idx = 0; idx < kRange; ++idx) {
1612 auto callback = [](IAsyncContext* ctxt, Status result) {
1613 // In-memory test.
1614 ASSERT_TRUE(false);
1615 };
1616 ReadContext context{ idx };
1617 Status result = store.Read(context, callback, 1);
1618 ASSERT_EQ(Status::Ok, result) << idx;
1619 // Should have performed 4 RMWs.
1620 ASSERT_EQ(5, context.output_length);
1621 ASSERT_EQ(kNumThreads * 4 * 3, context.output_bytes[0]);
1622 ASSERT_EQ(kNumThreads * 4 * 3, context.output_bytes[1]);
1623 }
1624
1625 store.StopSession();
1626
1627 // Rmw, decrement by 4.
1628 threads.clear();
1629 for(int64_t idx = 0; idx < kNumThreads; ++idx) {
1630 threads.emplace_back(rmw_worker, &store, -4, 8);
1631 }
1632 for(auto& thread : threads) {
1633 thread.join();
1634 }
1635
1636 // Read again.
1637 store.StartSession();
1638
1639 for(size_t idx = 0; idx < kRange; ++idx) {
1640 auto callback = [](IAsyncContext* ctxt, Status result) {
1641 // In-memory test.
1642 ASSERT_TRUE(false);
1643 };
1644 ReadContext context{ static_cast<uint8_t>(idx) };
1645 Status result = store.Read(context, callback, 1);
1646 ASSERT_EQ(Status::Ok, result);
1647 // Should have performed 4 RMWs.
1648 ASSERT_EQ(8, context.output_length);
1649 ASSERT_EQ(kNumThreads * -4, context.output_bytes[0]);
1650 ASSERT_EQ(kNumThreads * -16, context.output_bytes[1]);
1651 }
1652
1653 store.StopSession();
1654}
1655
1656TEST(InMemFaster, GrowHashTable) {
1657 class Key {
1658 public:
1659 Key(uint64_t key)
1660 : key_{ key } {
1661 }
1662
1663 inline static constexpr uint32_t size() {
1664 return static_cast<uint32_t>(sizeof(Key));
1665 }
1666 inline KeyHash GetHash() const {
1667 std::hash<uint64_t> hash_fn;
1668 return KeyHash{ hash_fn(key_) };
1669 }
1670
1671 /// Comparison operators.
1672 inline bool operator==(const Key& other) const {
1673 return key_ == other.key_;
1674 }
1675 inline bool operator!=(const Key& other) const {
1676 return key_ != other.key_;
1677 }
1678
1679 private:
1680 uint64_t key_;
1681 };
1682
1683 class RmwContext;
1684 class ReadContext;
1685
1686 class Value {
1687 public:
1688 Value()
1689 : value_{ 0 } {
1690 }
1691 Value(const Value& other)
1692 : value_{ other.value_ } {
1693 }
1694
1695 inline static constexpr uint32_t size() {
1696 return static_cast<uint32_t>(sizeof(Value));
1697 }
1698
1699 friend class RmwContext;
1700 friend class ReadContext;
1701
1702 private:
1703 union {
1704 int64_t value_;
1705 std::atomic<int64_t> atomic_value_;
1706 };
1707 };
1708
1709 class RmwContext : public IAsyncContext {
1710 public:
1711 typedef Key key_t;
1712 typedef Value value_t;
1713
1714 RmwContext(uint64_t key, int64_t incr)
1715 : key_{ key }
1716 , incr_{ incr } {
1717 }
1718
1719 /// Copy (and deep-copy) constructor.
1720 RmwContext(const RmwContext& other)
1721 : key_{ other.key_ }
1722 , incr_{ other.incr_ } {
1723 }
1724
1725 /// The implicit and explicit interfaces require a key() accessor.
1726 inline const Key& key() const {
1727 return key_;
1728 }
1729 inline static constexpr uint32_t value_size() {
1730 return sizeof(value_t);
1731 }
1732
1733 inline void RmwInitial(Value& value) {
1734 value.value_ = incr_;
1735 }
1736 inline void RmwCopy(const Value& old_value, Value& value) {
1737 value.value_ = old_value.value_ + incr_;
1738 }
1739 inline bool RmwAtomic(Value& value) {
1740 value.atomic_value_.fetch_add(incr_);
1741 return true;
1742 }
1743
1744 protected:
1745 /// The explicit interface requires a DeepCopy_Internal() implementation.
1746 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1747 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1748 }
1749
1750 private:
1751 int64_t incr_;
1752 Key key_;
1753 };
1754
1755 class ReadContext : public IAsyncContext {
1756 public:
1757 typedef Key key_t;
1758 typedef Value value_t;
1759
1760 ReadContext(uint64_t key)
1761 : key_{ key } {
1762 }
1763
1764 /// Copy (and deep-copy) constructor.
1765 ReadContext(const ReadContext& other)
1766 : key_{ other.key_ } {
1767 }
1768
1769 /// The implicit and explicit interfaces require a key() accessor.
1770 inline const Key& key() const {
1771 return key_;
1772 }
1773
1774 inline void Get(const Value& value) {
1775 // All reads should be atomic (from the mutable tail).
1776 ASSERT_TRUE(false);
1777 }
1778 inline void GetAtomic(const Value& value) {
1779 output = value.atomic_value_.load();
1780 }
1781
1782 protected:
1783 /// The explicit interface requires a DeepCopy_Internal() implementation.
1784 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
1785 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
1786 }
1787
1788 private:
1789 Key key_;
1790 public:
1791 int64_t output;
1792 };
1793
1794 static constexpr size_t kNumThreads = 2;
1795 static constexpr size_t kNumRmws = 32768;
1796 static constexpr size_t kRange = 8192;
1797
1798 static std::atomic<bool> grow_done{ false };
1799
1800 auto rmw_worker0 = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
1801 int64_t incr) {
1802 auto callback = [](uint64_t new_size) {
1803 grow_done = true;
1804 };
1805
1806 store_->StartSession();
1807
1808 for(size_t idx = 0; idx < kNumRmws; ++idx) {
1809 auto callback = [](IAsyncContext* ctxt, Status result) {
1810 // In-memory test.
1811 ASSERT_TRUE(false);
1812 };
1813 RmwContext context{ idx % kRange, incr };
1814 Status result = store_->Rmw(context, callback, 1);
1815 ASSERT_EQ(Status::Ok, result);
1816 }
1817
1818 // Double the size of the index.
1819 store_->GrowIndex(callback);
1820
1821 while(!grow_done) {
1822 store_->Refresh();
1823 std::this_thread::yield();
1824 }
1825
1826 store_->StopSession();
1827 };
1828
1829 auto rmw_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_,
1830 int64_t incr) {
1831 store_->StartSession();
1832
1833 for(size_t idx = 0; idx < kNumRmws; ++idx) {
1834 auto callback = [](IAsyncContext* ctxt, Status result) {
1835 // In-memory test.
1836 ASSERT_TRUE(false);
1837 };
1838 RmwContext context{ idx % kRange, incr };
1839 Status result = store_->Rmw(context, callback, 1);
1840 ASSERT_EQ(Status::Ok, result);
1841 }
1842
1843 while(!grow_done) {
1844 store_->Refresh();
1845 std::this_thread::yield();
1846 }
1847
1848 store_->StopSession();
1849 };
1850
1851 FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };
1852
1853 // Rmw, increment by 2 * idx.
1854 std::deque<std::thread> threads{};
1855 threads.emplace_back(rmw_worker0, &store, 0);
1856 for(int64_t idx = 1; idx < kNumThreads; ++idx) {
1857 threads.emplace_back(rmw_worker, &store, 2 * idx);
1858 }
1859 for(auto& thread : threads) {
1860 thread.join();
1861 }
1862
1863 // Read.
1864 store.StartSession();
1865
1866 for(size_t idx = 0; idx < kRange; ++idx) {
1867 auto callback = [](IAsyncContext* ctxt, Status result) {
1868 // In-memory test.
1869 ASSERT_TRUE(false);
1870 };
1871 ReadContext context{ idx };
1872 Status result = store.Read(context, callback, 1);
1873 ASSERT_EQ(Status::Ok, result) << idx;
1874 // Should have performed 4 RMWs.
1875 ASSERT_EQ((kNumThreads * (kNumThreads - 1)) * (kNumRmws / kRange), context.output);
1876 }
1877
1878 store.StopSession();
1879
1880 // Rmw, decrement by idx.
1881 grow_done = false;
1882 threads.clear();
1883 threads.emplace_back(rmw_worker0, &store, 0);
1884 for(int64_t idx = 1; idx < kNumThreads; ++idx) {
1885 threads.emplace_back(rmw_worker, &store, -idx);
1886 }
1887 for(auto& thread : threads) {
1888 thread.join();
1889 }
1890
1891 // Read again.
1892 store.StartSession();
1893
1894 for(size_t idx = 0; idx < kRange; ++idx) {
1895 auto callback = [](IAsyncContext* ctxt, Status result) {
1896 // In-memory test.
1897 ASSERT_TRUE(false);
1898 };
1899 ReadContext context{ static_cast<uint8_t>(idx) };
1900 Status result = store.Read(context, callback, 1);
1901 ASSERT_EQ(Status::Ok, result);
1902 // All upserts should have inserts (non-atomic).
1903 ASSERT_EQ(((kNumThreads * (kNumThreads - 1)) / 2) * (kNumRmws / kRange), context.output);
1904 }
1905
1906 store.StopSession();
1907}
1908
1909TEST(InMemFaster, UpsertRead_VariableLengthKey) {
1910 class Key {
1911 public:
1912 /// This constructor is called when creating a Context so we keep track of memory containing key
1913 Key(const uint16_t* key, const uint64_t key_length)
1914 : temp_buffer{ key }
1915 , key_length_{ key_length } {
1916 }
1917
1918 /// This constructor is called when record is being allocated so we can freely copy into our buffer
1919 Key(const Key& other) {
1920 key_length_ = other.key_length_;
1921 temp_buffer = NULL;
1922 if (other.temp_buffer == NULL) {
1923 memcpy(buffer(), other.buffer(), key_length_);
1924 } else {
1925 memcpy(buffer(), other.temp_buffer, key_length_);
1926 }
1927 }
1928
1929 /// This destructor ensures we don't leak memory due to Key objects not allocated on HybridLog
1930 ~Key() {
1931 if (this->temp_buffer != NULL) {
1932 free((void*)temp_buffer);
1933 }
1934 }
1935
1936 /// Methods and operators required by the (implicit) interface:
1937 inline uint32_t size() const {
1938 return static_cast<uint32_t>(sizeof(Key) + key_length_);
1939 }
1940 inline KeyHash GetHash() const {
1941 if (this->temp_buffer != NULL) {
1942 return KeyHash(Utility::HashBytes(temp_buffer, key_length_));
1943 }
1944 return KeyHash(Utility::HashBytes(buffer(), key_length_));
1945 }
1946
1947 /// Comparison operators.
1948 inline bool operator==(const Key& other) const {
1949 if (this->key_length_ != other.key_length_) return false;
1950 if (this->temp_buffer != NULL) {
1951 return memcmp(temp_buffer, other.buffer(), key_length_) == 0;
1952 }
1953 return memcmp(buffer(), other.buffer(), key_length_) == 0;
1954 }
1955 inline bool operator!=(const Key& other) const {
1956 if (this->key_length_ != other.key_length_) return true;
1957 if (this->temp_buffer != NULL) {
1958 return memcmp(temp_buffer, other.buffer(), key_length_) != 0;
1959 }
1960 return memcmp(buffer(), other.buffer(), key_length_) != 0;
1961 }
1962
1963 private:
1964 uint64_t key_length_;
1965 const uint16_t* temp_buffer;
1966
1967 inline const uint16_t* buffer() const {
1968 return reinterpret_cast<const uint16_t*>(this + 1);
1969 }
1970 inline uint16_t* buffer() {
1971 return reinterpret_cast<uint16_t*>(this + 1);
1972 }
1973 };
1974
1975 class UpsertContext;
1976 class ReadContext;
1977
1978 class Value {
1979 public:
1980 Value()
1981 : value_{ 0 } {
1982 }
1983 Value(const Value& other)
1984 : value_{ other.value_ } {
1985 }
1986 Value(uint8_t value)
1987 : value_{ value } {
1988 }
1989
1990 inline static constexpr uint32_t size() {
1991 return static_cast<uint32_t>(sizeof(Value));
1992 }
1993
1994 friend class UpsertContext;
1995 friend class ReadContext;
1996
1997 private:
1998 union {
1999 uint8_t value_;
2000 std::atomic<uint8_t> atomic_value_;
2001 };
2002 };
2003
2004 class UpsertContext : public IAsyncContext {
2005 public:
2006 typedef Key key_t;
2007 typedef Value value_t;
2008
2009 UpsertContext(uint16_t* key, uint64_t key_length)
2010 : key_{ key, key_length } {
2011 }
2012
2013 /// Copy (and deep-copy) constructor.
2014 UpsertContext(const UpsertContext& other)
2015 : key_{ other.key_ } {
2016 }
2017
2018 /// The implicit and explicit interfaces require a key() accessor.
2019 inline const Key& key() const {
2020 return key_;
2021 }
2022 inline static constexpr uint32_t value_size() {
2023 return sizeof(value_t);
2024 }
2025 /// Non-atomic and atomic Put() methods.
2026 inline void Put(Value& value) {
2027 value.value_ = 23;
2028 }
2029 inline bool PutAtomic(Value& value) {
2030 value.atomic_value_.store(42);
2031 return true;
2032 }
2033
2034 protected:
2035 /// The explicit interface requires a DeepCopy_Internal() implementation.
2036 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2037 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2038 }
2039
2040 private:
2041 Key key_;
2042 };
2043
2044 class ReadContext : public IAsyncContext {
2045 public:
2046 typedef Key key_t;
2047 typedef Value value_t;
2048
2049 ReadContext(uint16_t* key, uint64_t key_length)
2050 : key_{ key, key_length } {
2051 }
2052
2053 /// Copy (and deep-copy) constructor.
2054 ReadContext(const ReadContext& other)
2055 : key_{ other.key_ } {
2056 }
2057
2058 /// The implicit and explicit interfaces require a key() accessor.
2059 inline const Key& key() const {
2060 return key_;
2061 }
2062
2063 inline void Get(const Value& value) {
2064 // All reads should be atomic (from the mutable tail).
2065 ASSERT_TRUE(false);
2066 }
2067 inline void GetAtomic(const Value& value) {
2068 output = value.atomic_value_.load();
2069 }
2070
2071 protected:
2072 /// The explicit interface requires a DeepCopy_Internal() implementation.
2073 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
2074 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
2075 }
2076
2077 private:
2078 Key key_;
2079 public:
2080 uint8_t output;
2081 };
2082
2083 FasterKv<Key, Value, FASTER::device::NullDisk> store { 128, 1073741824, "" };
2084
2085 store.StartSession();
2086
2087 // Insert.
2088 for(size_t idx = 1; idx < 256; ++idx) {
2089 auto callback = [](IAsyncContext* ctxt, Status result) {
2090 // In-memory test.
2091 ASSERT_TRUE(false);
2092 };
2093
2094 // Create the key as a variable length array
2095 uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t));
2096 for (size_t j = 0; j < idx; ++j) {
2097 key[j] = 42;
2098 }
2099
2100 UpsertContext context{ key, idx };
2101 Status result = store.Upsert(context, callback, 1);
2102 ASSERT_EQ(Status::Ok, result);
2103 }
2104 // Read.
2105 for(size_t idx = 1; idx < 256; ++idx) {
2106 auto callback = [](IAsyncContext* ctxt, Status result) {
2107 // In-memory test.
2108 ASSERT_TRUE(false);
2109 };
2110
2111 // Create the key as a variable length array
2112 uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t));
2113 for (size_t j = 0; j < idx; ++j) {
2114 key[j] = 42;
2115 }
2116
2117 ReadContext context{ key, idx };
2118 Status result = store.Read(context, callback, 1);
2119 ASSERT_EQ(Status::Ok, result);
2120 // All upserts should have inserts (non-atomic).
2121 ASSERT_EQ(23, context.output);
2122 }
2123 // Update.
2124 for(size_t idx = 1; idx < 256; ++idx) {
2125 auto callback = [](IAsyncContext* ctxt, Status result) {
2126 // In-memory test.
2127 ASSERT_TRUE(false);
2128 };
2129
2130 // Create the key as a variable length array
2131 uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t));
2132 for (size_t j = 0; j < idx; ++j) {
2133 key[j] = 42;
2134 }
2135
2136 UpsertContext context{ key, idx };
2137 Status result = store.Upsert(context, callback, 1);
2138 ASSERT_EQ(Status::Ok, result);
2139 }
2140 // Read again.
2141 for(size_t idx = 1; idx < 256; ++idx) {
2142 auto callback = [](IAsyncContext* ctxt, Status result) {
2143 // In-memory test.
2144 ASSERT_TRUE(false);
2145 };
2146
2147 // Create the key as a variable length array
2148 uint16_t* key = (uint16_t*) malloc(idx * sizeof(uint16_t));
2149 for (size_t j = 0; j < idx; ++j) {
2150 key[j] = 42;
2151 }
2152
2153 ReadContext context{ key, idx };
2154 Status result = store.Read(context, callback, 1);
2155 ASSERT_EQ(Status::Ok, result);
2156 // All upserts should have updates (atomic).
2157 ASSERT_EQ(42, context.output);
2158 }
2159
2160 store.StopSession();
2161}
2162
2163int main(int argc, char** argv) {
2164 ::testing::InitGoogleTest(&argc, argv);
2165 return RUN_ALL_TESTS();
2166}