1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <experimental/filesystem> |
7 | |
8 | using namespace FASTER; |
9 | |
10 | /// Disk's log uses 64 MB segments. |
11 | typedef FASTER::device::FileSystemDisk<handler_t, 67108864L> disk_t; |
12 | |
13 | TEST(CLASS, UpsertRead_Serial) { |
14 | class Key { |
15 | public: |
16 | Key(uint64_t pt1, uint64_t pt2) |
17 | : pt1_{ pt1 } |
18 | , pt2_{ pt2 } { |
19 | } |
20 | |
21 | inline static constexpr uint32_t size() { |
22 | return static_cast<uint32_t>(sizeof(Key)); |
23 | } |
24 | inline KeyHash GetHash() const { |
25 | std::hash<uint64_t> hash_fn; |
26 | return KeyHash{ hash_fn(pt1_) }; |
27 | } |
28 | |
29 | /// Comparison operators. |
30 | inline bool operator==(const Key& other) const { |
31 | return pt1_ == other.pt1_ && |
32 | pt2_ == other.pt2_; |
33 | } |
34 | inline bool operator!=(const Key& other) const { |
35 | return pt1_ != other.pt1_ || |
36 | pt2_ != other.pt2_; |
37 | } |
38 | |
39 | private: |
40 | uint64_t pt1_; |
41 | uint64_t pt2_; |
42 | }; |
43 | |
44 | class UpsertContext; |
45 | class ReadContext; |
46 | |
47 | class Value { |
48 | public: |
49 | Value() |
50 | : gen_{ 0 } |
51 | , value_{ 0 } |
52 | , length_{ 0 } { |
53 | } |
54 | |
55 | inline static constexpr uint32_t size() { |
56 | return static_cast<uint32_t>(sizeof(Value)); |
57 | } |
58 | |
59 | friend class UpsertContext; |
60 | friend class ReadContext; |
61 | |
62 | private: |
63 | std::atomic<uint64_t> gen_; |
64 | uint8_t value_[1014]; |
65 | uint16_t length_; |
66 | }; |
67 | static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024" ); |
68 | static_assert(alignof(Value) == 8, "alignof(Value) != 8" ); |
69 | |
70 | class UpsertContext : public IAsyncContext { |
71 | public: |
72 | typedef Key key_t; |
73 | typedef Value value_t; |
74 | |
75 | UpsertContext(const Key& key, uint8_t val) |
76 | : key_{ key } |
77 | , val_{ val } { |
78 | } |
79 | |
80 | /// Copy (and deep-copy) constructor. |
81 | UpsertContext(const UpsertContext& other) |
82 | : key_{ other.key_ } |
83 | , val_{ other.val_ } { |
84 | } |
85 | |
86 | /// The implicit and explicit interfaces require a key() accessor. |
87 | inline const Key& key() const { |
88 | return key_; |
89 | } |
90 | inline static constexpr uint32_t value_size() { |
91 | return sizeof(value_t); |
92 | } |
93 | /// Non-atomic and atomic Put() methods. |
94 | inline void Put(Value& value) { |
95 | value.gen_ = 0; |
96 | std::memset(value.value_, val_, val_); |
97 | value.length_ = val_; |
98 | } |
99 | inline bool PutAtomic(Value& value) { |
100 | // Get the lock on the value. |
101 | uint64_t expected_gen; |
102 | bool success; |
103 | do { |
104 | do { |
105 | // Spin until other the thread releases the lock. |
106 | expected_gen = value.gen_.load(); |
107 | } while(expected_gen == UINT64_MAX); |
108 | // Try to get the lock. |
109 | success = value.gen_.compare_exchange_weak(expected_gen, UINT64_MAX); |
110 | } while(!success); |
111 | |
112 | std::memset(value.value_, val_, val_); |
113 | value.length_ = val_; |
114 | // Increment the value's generation number. |
115 | value.gen_.store(expected_gen + 1); |
116 | return true; |
117 | } |
118 | |
119 | protected: |
120 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
121 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
122 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
123 | } |
124 | |
125 | private: |
126 | Key key_; |
127 | uint8_t val_; |
128 | }; |
129 | |
130 | class ReadContext : public IAsyncContext { |
131 | public: |
132 | typedef Key key_t; |
133 | typedef Value value_t; |
134 | |
135 | ReadContext(Key key, uint8_t expected) |
136 | : key_{ key } |
137 | , expected_{ expected } { |
138 | } |
139 | |
140 | /// Copy (and deep-copy) constructor. |
141 | ReadContext(const ReadContext& other) |
142 | : key_{ other.key_ } |
143 | , expected_{ other.expected_ } { |
144 | } |
145 | |
146 | /// The implicit and explicit interfaces require a key() accessor. |
147 | inline const Key& key() const { |
148 | return key_; |
149 | } |
150 | |
151 | inline void Get(const Value& value) { |
152 | // This is a paging test, so we expect to read stuff from disk. |
153 | ASSERT_EQ(expected_, value.length_); |
154 | ASSERT_EQ(expected_, value.value_[expected_ - 5]); |
155 | } |
156 | inline void GetAtomic(const Value& value) { |
157 | uint64_t post_gen = value.gen_.load(); |
158 | uint64_t pre_gen; |
159 | uint16_t len; |
160 | uint8_t val; |
161 | do { |
162 | // Pre- gen # for this read is last read's post- gen #. |
163 | pre_gen = post_gen; |
164 | len = value.length_; |
165 | val = value.value_[len - 5]; |
166 | post_gen = value.gen_.load(); |
167 | } while(pre_gen != post_gen); |
168 | ASSERT_EQ(expected_, static_cast<uint8_t>(len)); |
169 | ASSERT_EQ(expected_, val); |
170 | } |
171 | |
172 | protected: |
173 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
174 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
175 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
176 | } |
177 | |
178 | private: |
179 | Key key_; |
180 | uint8_t expected_; |
181 | }; |
182 | |
183 | std::experimental::filesystem::create_directories("logs" ); |
184 | |
185 | // 8 pages! |
186 | FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs" , 0.5 }; |
187 | |
188 | Guid session_id = store.StartSession(); |
189 | |
190 | constexpr size_t kNumRecords = 250000; |
191 | |
192 | // Insert. |
193 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
194 | auto callback = [](IAsyncContext* ctxt, Status result) { |
195 | // Upserts don't go to disk. |
196 | ASSERT_TRUE(false); |
197 | }; |
198 | |
199 | if(idx % 256 == 0) { |
200 | store.Refresh(); |
201 | } |
202 | |
203 | UpsertContext context{ Key{idx, idx}, 25 }; |
204 | Status result = store.Upsert(context, callback, 1); |
205 | ASSERT_EQ(Status::Ok, result); |
206 | } |
207 | // Read. |
208 | static std::atomic<uint64_t> records_read{ 0 }; |
209 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
210 | auto callback = [](IAsyncContext* ctxt, Status result) { |
211 | CallbackContext<ReadContext> context{ ctxt }; |
212 | ASSERT_EQ(Status::Ok, result); |
213 | ++records_read; |
214 | }; |
215 | |
216 | if(idx % 256 == 0) { |
217 | store.Refresh(); |
218 | } |
219 | |
220 | ReadContext context{ Key{ idx, idx}, 25 }; |
221 | Status result = store.Read(context, callback, 1); |
222 | if(result == Status::Ok) { |
223 | ++records_read; |
224 | } else { |
225 | ASSERT_EQ(Status::Pending, result); |
226 | } |
227 | } |
228 | |
229 | ASSERT_LT(records_read.load(), kNumRecords); |
230 | bool result = store.CompletePending(true); |
231 | ASSERT_TRUE(result); |
232 | ASSERT_EQ(kNumRecords, records_read.load()); |
233 | |
234 | // Update. |
235 | static std::atomic<uint64_t> records_updated{ 0 }; |
236 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
237 | auto callback = [](IAsyncContext* ctxt, Status result) { |
238 | // Upserts don't go to disk. |
239 | ASSERT_TRUE(false); |
240 | }; |
241 | |
242 | if(idx % 256 == 0) { |
243 | store.Refresh(); |
244 | } |
245 | |
246 | UpsertContext context{ Key{ idx, idx }, 87 }; |
247 | Status result = store.Upsert(context, callback, 1); |
248 | if(result == Status::Ok) { |
249 | ++records_updated; |
250 | } else { |
251 | ASSERT_EQ(Status::Pending, result); |
252 | } |
253 | } |
254 | |
255 | ASSERT_EQ(kNumRecords, records_updated.load()); |
256 | result = store.CompletePending(true); |
257 | ASSERT_TRUE(result); |
258 | |
259 | // Read again. |
260 | records_read = 0;; |
261 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
262 | auto callback = [](IAsyncContext* ctxt, Status result) { |
263 | CallbackContext<ReadContext> context{ ctxt }; |
264 | ASSERT_EQ(Status::Ok, result); |
265 | ++records_read; |
266 | }; |
267 | |
268 | if(idx % 256 == 0) { |
269 | store.Refresh(); |
270 | } |
271 | |
272 | ReadContext context{ Key{ idx, idx }, 87 }; |
273 | Status result = store.Read(context, callback, 1); |
274 | if(result == Status::Ok) { |
275 | ++records_read; |
276 | } else { |
277 | ASSERT_EQ(Status::Pending, result); |
278 | } |
279 | } |
280 | |
281 | ASSERT_LT(records_read.load(), kNumRecords); |
282 | result = store.CompletePending(true); |
283 | ASSERT_TRUE(result); |
284 | ASSERT_EQ(kNumRecords, records_read.load()); |
285 | |
286 | store.StopSession(); |
287 | } |
288 | |
289 | TEST(CLASS, UpsertRead_Concurrent) { |
290 | class UpsertContext; |
291 | class ReadContext; |
292 | |
293 | class Key { |
294 | public: |
295 | Key(uint64_t pt1, uint64_t pt2) |
296 | : pt1_{ pt1 } |
297 | , pt2_{ pt2 } { |
298 | } |
299 | |
300 | inline static constexpr uint32_t size() { |
301 | return static_cast<uint32_t>(sizeof(Key)); |
302 | } |
303 | inline KeyHash GetHash() const { |
304 | std::hash<uint64_t> hash_fn; |
305 | return KeyHash{ hash_fn(pt1_) }; |
306 | } |
307 | |
308 | /// Comparison operators. |
309 | inline bool operator==(const Key& other) const { |
310 | return pt1_ == other.pt1_ && |
311 | pt2_ == other.pt2_; |
312 | } |
313 | inline bool operator!=(const Key& other) const { |
314 | return pt1_ != other.pt1_ || |
315 | pt2_ != other.pt2_; |
316 | } |
317 | |
318 | friend class UpsertContext; |
319 | friend class ReadContext; |
320 | |
321 | private: |
322 | uint64_t pt1_; |
323 | uint64_t pt2_; |
324 | }; |
325 | |
326 | class Value { |
327 | public: |
328 | Value() |
329 | : gen_{ 0 } |
330 | , value_{ 0 } |
331 | , length_{ 0 } { |
332 | } |
333 | |
334 | inline static constexpr uint32_t size() { |
335 | return static_cast<uint32_t>(sizeof(Value)); |
336 | } |
337 | |
338 | friend class UpsertContext; |
339 | friend class ReadContext; |
340 | |
341 | private: |
342 | std::atomic<uint64_t> gen_; |
343 | uint8_t value_[1014]; |
344 | uint16_t length_; |
345 | }; |
346 | static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024" ); |
347 | static_assert(alignof(Value) == 8, "alignof(Value) != 8" ); |
348 | |
349 | class UpsertContext : public IAsyncContext { |
350 | public: |
351 | typedef Key key_t; |
352 | typedef Value value_t; |
353 | |
354 | UpsertContext(const Key& key, uint8_t val) |
355 | : key_{ key } |
356 | , val_{ val } { |
357 | } |
358 | |
359 | /// Copy (and deep-copy) constructor. |
360 | UpsertContext(const UpsertContext& other) |
361 | : key_{ other.key_ } |
362 | , val_{ other.val_ } { |
363 | } |
364 | |
365 | /// The implicit and explicit interfaces require a key() accessor. |
366 | inline const Key& key() const { |
367 | return key_; |
368 | } |
369 | inline static constexpr uint32_t value_size() { |
370 | return sizeof(value_t); |
371 | } |
372 | /// Non-atomic and atomic Put() methods. |
373 | inline void Put(Value& value) { |
374 | value.gen_ = 0; |
375 | std::memset(value.value_, val_, val_); |
376 | value.length_ = val_; |
377 | } |
378 | inline bool PutAtomic(Value& value) { |
379 | // Get the lock on the value. |
380 | uint64_t expected_gen; |
381 | bool success; |
382 | do { |
383 | do { |
384 | // Spin until other the thread releases the lock. |
385 | expected_gen = value.gen_.load(); |
386 | } while(expected_gen == UINT64_MAX); |
387 | // Try to get the lock. |
388 | success = value.gen_.compare_exchange_weak(expected_gen, UINT64_MAX); |
389 | } while(!success); |
390 | |
391 | std::memset(value.value_, val_, val_); |
392 | value.length_ = val_; |
393 | // Increment the value's generation number. |
394 | value.gen_.store(expected_gen + 1); |
395 | return true; |
396 | } |
397 | |
398 | protected: |
399 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
400 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
401 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
402 | } |
403 | |
404 | private: |
405 | Key key_; |
406 | uint8_t val_; |
407 | }; |
408 | |
409 | class ReadContext : public IAsyncContext { |
410 | public: |
411 | typedef Key key_t; |
412 | typedef Value value_t; |
413 | |
414 | ReadContext(Key key, uint8_t expected) |
415 | : key_{ key } |
416 | , expected_{ expected } { |
417 | } |
418 | |
419 | /// Copy (and deep-copy) constructor. |
420 | ReadContext(const ReadContext& other) |
421 | : key_{ other.key_ } |
422 | , expected_{ other.expected_ } { |
423 | } |
424 | |
425 | /// The implicit and explicit interfaces require a key() accessor. |
426 | inline const Key& key() const { |
427 | return key_; |
428 | } |
429 | |
430 | inline void Get(const Value& value) { |
431 | // This is a paging test, so we expect to read stuff from disk. |
432 | ASSERT_EQ(expected_, value.length_); |
433 | ASSERT_EQ(expected_, value.value_[expected_ - 5]); |
434 | } |
435 | inline void GetAtomic(const Value& value) { |
436 | uint64_t post_gen = value.gen_.load(); |
437 | uint64_t pre_gen; |
438 | uint16_t len; |
439 | uint8_t val; |
440 | do { |
441 | // Pre- gen # for this read is last read's post- gen #. |
442 | pre_gen = post_gen; |
443 | len = value.length_; |
444 | val = value.value_[len - 5]; |
445 | post_gen = value.gen_.load(); |
446 | } while(pre_gen != post_gen); |
447 | ASSERT_EQ(expected_, val); |
448 | } |
449 | |
450 | protected: |
451 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
452 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
453 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
454 | } |
455 | |
456 | private: |
457 | Key key_; |
458 | uint8_t expected_; |
459 | }; |
460 | |
461 | std::experimental::filesystem::create_directories("logs" ); |
462 | |
463 | // 8 pages! |
464 | FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs" , 0.5 }; |
465 | |
466 | static constexpr size_t kNumRecords = 250000; |
467 | static constexpr size_t kNumThreads = 2; |
468 | |
469 | static std::atomic<uint64_t> num_writes{ 0 }; |
470 | |
471 | auto upsert_worker = [](FasterKv<Key, Value, disk_t>* store_, |
472 | size_t thread_idx, uint8_t val) { |
473 | Guid session_id = store_->StartSession(); |
474 | |
475 | for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) { |
476 | auto callback = [](IAsyncContext* ctxt, Status result) { |
477 | // In-memory test. |
478 | ASSERT_TRUE(false); |
479 | }; |
480 | |
481 | if(idx % 256 == 0) { |
482 | store_->Refresh(); |
483 | } |
484 | |
485 | uint64_t key_component = thread_idx * (kNumRecords / kNumThreads) + idx; |
486 | UpsertContext context{ Key{ key_component, key_component }, val }; |
487 | Status result = store_->Upsert(context, callback, 1); |
488 | ASSERT_EQ(Status::Ok, result); |
489 | ++num_writes; |
490 | } |
491 | |
492 | store_->StopSession(); |
493 | }; |
494 | |
495 | // Insert. |
496 | std::deque<std::thread> threads{}; |
497 | for(size_t idx = 0; idx < kNumThreads; ++idx) { |
498 | threads.emplace_back(upsert_worker, &store, idx, 25); |
499 | } |
500 | for(auto& thread : threads) { |
501 | thread.join(); |
502 | } |
503 | |
504 | ASSERT_EQ(kNumRecords, num_writes.load()); |
505 | |
506 | // Read. |
507 | Guid session_id = store.StartSession(); |
508 | |
509 | static std::atomic<uint64_t> records_read{ 0 }; |
510 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
511 | auto callback = [](IAsyncContext* ctxt, Status result) { |
512 | CallbackContext<ReadContext> context{ ctxt }; |
513 | ASSERT_EQ(Status::Ok, result); |
514 | ++records_read; |
515 | }; |
516 | |
517 | if(idx % 256 == 0) { |
518 | store.Refresh(); |
519 | } |
520 | |
521 | ReadContext context{ Key{ idx, idx }, 25 }; |
522 | Status result = store.Read(context, callback, 1); |
523 | if(result == Status::Ok) { |
524 | ++records_read; |
525 | } else { |
526 | ASSERT_EQ(Status::Pending, result) << idx; |
527 | } |
528 | } |
529 | |
530 | ASSERT_LT(records_read.load(), kNumRecords); |
531 | bool result = store.CompletePending(true); |
532 | ASSERT_TRUE(result); |
533 | ASSERT_EQ(kNumRecords, records_read.load()); |
534 | |
535 | //// Update. |
536 | num_writes = 0; |
537 | threads.clear(); |
538 | for(size_t idx = 0; idx < kNumThreads; ++idx) { |
539 | threads.emplace_back(upsert_worker, &store, idx, 87); |
540 | } |
541 | for(auto& thread : threads) { |
542 | thread.join(); |
543 | } |
544 | |
545 | ASSERT_EQ(kNumRecords, num_writes.load()); |
546 | |
547 | // Delete some old copies of records (160 MB) that we no longer need. |
548 | static constexpr uint64_t kNewBeginAddress{ 167772160L }; |
549 | static std::atomic<bool> truncated{ false }; |
550 | static std::atomic<bool> complete{ false }; |
551 | auto truncate_callback = [](uint64_t offset) { |
552 | ASSERT_LE(offset, kNewBeginAddress); |
553 | truncated = true; |
554 | }; |
555 | auto complete_callback = []() { |
556 | complete = true; |
557 | }; |
558 | |
559 | result = store.ShiftBeginAddress(Address{ kNewBeginAddress }, truncate_callback, complete_callback); |
560 | ASSERT_TRUE(result); |
561 | |
562 | while(!truncated || !complete) { |
563 | store.CompletePending(false); |
564 | } |
565 | |
566 | // Read again. |
567 | records_read = 0;; |
568 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
569 | auto callback = [](IAsyncContext* ctxt, Status result) { |
570 | CallbackContext<ReadContext> context{ ctxt }; |
571 | ASSERT_EQ(Status::Ok, result); |
572 | ++records_read; |
573 | }; |
574 | |
575 | if(idx % 256 == 0) { |
576 | store.Refresh(); |
577 | } |
578 | |
579 | ReadContext context{ Key{ idx, idx }, 87 }; |
580 | Status result = store.Read(context, callback, 1); |
581 | if(result == Status::Ok) { |
582 | ++records_read; |
583 | } else { |
584 | ASSERT_EQ(Status::Pending, result); |
585 | } |
586 | } |
587 | |
588 | ASSERT_LT(records_read.load(), kNumRecords); |
589 | result = store.CompletePending(true); |
590 | ASSERT_TRUE(result); |
591 | ASSERT_EQ(kNumRecords, records_read.load()); |
592 | |
593 | store.StopSession(); |
594 | } |
595 | |
596 | TEST(CLASS, Rmw) { |
597 | class Key { |
598 | public: |
599 | Key(uint64_t key) |
600 | : key_{ key } { |
601 | } |
602 | |
603 | inline static constexpr uint32_t size() { |
604 | return static_cast<uint32_t>(sizeof(Key)); |
605 | } |
606 | inline KeyHash GetHash() const { |
607 | return KeyHash{ Utility::GetHashCode(key_) }; |
608 | } |
609 | |
610 | /// Comparison operators. |
611 | inline bool operator==(const Key& other) const { |
612 | return key_ == other.key_; |
613 | } |
614 | inline bool operator!=(const Key& other) const { |
615 | return key_ != other.key_; |
616 | } |
617 | |
618 | private: |
619 | uint64_t key_; |
620 | }; |
621 | |
622 | class RmwContext; |
623 | |
624 | class Value { |
625 | public: |
626 | Value() |
627 | : counter_{ 0 } |
628 | , junk_{ 1 } { |
629 | } |
630 | |
631 | inline static constexpr uint32_t size() { |
632 | return static_cast<uint32_t>(sizeof(Value)); |
633 | } |
634 | |
635 | friend class RmwContext; |
636 | |
637 | private: |
638 | std::atomic<uint64_t> counter_; |
639 | uint8_t junk_[1016]; |
640 | }; |
641 | static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024" ); |
642 | static_assert(alignof(Value) == 8, "alignof(Value) != 8" ); |
643 | |
644 | class RmwContext : public IAsyncContext { |
645 | public: |
646 | typedef Key key_t; |
647 | typedef Value value_t; |
648 | |
649 | RmwContext(Key key, uint64_t incr) |
650 | : key_{ key } |
651 | , incr_{ incr } |
652 | , val_{ 0 } { |
653 | } |
654 | |
655 | /// Copy (and deep-copy) constructor. |
656 | RmwContext(const RmwContext& other) |
657 | : key_{ other.key_ } |
658 | , incr_{ other.incr_ } |
659 | , val_{ other.val_ } { |
660 | } |
661 | |
662 | inline const Key& key() const { |
663 | return key_; |
664 | } |
665 | inline static constexpr uint32_t value_size() { |
666 | return sizeof(value_t); |
667 | } |
668 | inline void RmwInitial(Value& value) { |
669 | value.counter_ = incr_; |
670 | val_ = value.counter_; |
671 | } |
672 | inline void RmwCopy(const Value& old_value, Value& value) { |
673 | value.counter_ = old_value.counter_ + incr_; |
674 | val_ = value.counter_; |
675 | } |
676 | inline bool RmwAtomic(Value& value) { |
677 | val_ = value.counter_.fetch_add(incr_) + incr_; |
678 | return true; |
679 | } |
680 | |
681 | inline uint64_t val() const { |
682 | return val_; |
683 | } |
684 | |
685 | protected: |
686 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
687 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
688 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
689 | } |
690 | |
691 | private: |
692 | Key key_; |
693 | uint64_t incr_; |
694 | |
695 | uint64_t val_; |
696 | }; |
697 | |
698 | std::experimental::filesystem::create_directories("logs" ); |
699 | |
700 | // 8 pages! |
701 | FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs" , 0.5 }; |
702 | |
703 | Guid session_id = store.StartSession(); |
704 | |
705 | constexpr size_t kNumRecords = 200000; |
706 | |
707 | // Initial RMW. |
708 | static std::atomic<uint64_t> records_touched{ 0 }; |
709 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
710 | auto callback = [](IAsyncContext* ctxt, Status result) { |
711 | CallbackContext<RmwContext> context{ ctxt }; |
712 | ASSERT_EQ(Status::Ok, result); |
713 | ASSERT_EQ(3, context->val()); |
714 | ++records_touched; |
715 | }; |
716 | |
717 | if(idx % 256 == 0) { |
718 | store.Refresh(); |
719 | } |
720 | |
721 | RmwContext context{ Key{ idx }, 3 }; |
722 | Status result = store.Rmw(context, callback, 1); |
723 | if(result == Status::Ok) { |
724 | ASSERT_EQ(3, context.val()); |
725 | ++records_touched; |
726 | } else { |
727 | ASSERT_EQ(Status::Pending, result); |
728 | } |
729 | } |
730 | |
731 | bool result = store.CompletePending(true); |
732 | ASSERT_TRUE(result); |
733 | ASSERT_EQ(kNumRecords, records_touched.load()); |
734 | |
735 | // Second RMW. |
736 | records_touched = 0; |
737 | for(size_t idx = kNumRecords; idx > 0; --idx) { |
738 | auto callback = [](IAsyncContext* ctxt, Status result) { |
739 | CallbackContext<RmwContext> context{ ctxt }; |
740 | ASSERT_EQ(Status::Ok, result); |
741 | ASSERT_EQ(8, context->val()); |
742 | ++records_touched; |
743 | }; |
744 | |
745 | if(idx % 256 == 0) { |
746 | store.Refresh(); |
747 | } |
748 | |
749 | RmwContext context{ Key{ idx - 1 }, 5 }; |
750 | Status result = store.Rmw(context, callback, 1); |
751 | if(result == Status::Ok) { |
752 | ASSERT_EQ(8, context.val()) << idx - 1; |
753 | ++records_touched; |
754 | } else { |
755 | ASSERT_EQ(Status::Pending, result); |
756 | } |
757 | } |
758 | |
759 | ASSERT_LT(records_touched.load(), kNumRecords); |
760 | result = store.CompletePending(true); |
761 | ASSERT_TRUE(result); |
762 | ASSERT_EQ(kNumRecords, records_touched.load()); |
763 | |
764 | store.StopSession(); |
765 | } |
766 | |
767 | TEST(CLASS, Rmw_Concurrent) { |
768 | class Key { |
769 | public: |
770 | Key(uint64_t key) |
771 | : key_{ key } { |
772 | } |
773 | |
774 | inline static constexpr uint32_t size() { |
775 | return static_cast<uint32_t>(sizeof(Key)); |
776 | } |
777 | inline KeyHash GetHash() const { |
778 | return KeyHash{ Utility::GetHashCode(key_) }; |
779 | } |
780 | |
781 | /// Comparison operators. |
782 | inline bool operator==(const Key& other) const { |
783 | return key_ == other.key_; |
784 | } |
785 | inline bool operator!=(const Key& other) const { |
786 | return key_ != other.key_; |
787 | } |
788 | |
789 | private: |
790 | uint64_t key_; |
791 | }; |
792 | |
793 | class RmwContext; |
794 | class ReadContext; |
795 | |
796 | class Value { |
797 | public: |
798 | Value() |
799 | : counter_{ 0 } |
800 | , junk_{ 1 } { |
801 | } |
802 | |
803 | inline static constexpr uint32_t size() { |
804 | return static_cast<uint32_t>(sizeof(Value)); |
805 | } |
806 | |
807 | friend class RmwContext; |
808 | friend class ReadContext; |
809 | |
810 | private: |
811 | std::atomic<uint64_t> counter_; |
812 | uint8_t junk_[1016]; |
813 | }; |
814 | static_assert(sizeof(Value) == 1024, "sizeof(Value) != 1024" ); |
815 | static_assert(alignof(Value) == 8, "alignof(Value) != 8" ); |
816 | |
817 | class RmwContext : public IAsyncContext { |
818 | public: |
819 | typedef Key key_t; |
820 | typedef Value value_t; |
821 | |
822 | RmwContext(Key key, uint64_t incr) |
823 | : key_{ key } |
824 | , incr_{ incr } { |
825 | } |
826 | |
827 | /// Copy (and deep-copy) constructor. |
828 | RmwContext(const RmwContext& other) |
829 | : key_{ other.key_ } |
830 | , incr_{ other.incr_ } { |
831 | } |
832 | |
833 | inline const Key& key() const { |
834 | return key_; |
835 | } |
836 | inline static constexpr uint32_t value_size() { |
837 | return sizeof(value_t); |
838 | } |
839 | inline void RmwInitial(Value& value) { |
840 | value.counter_ = incr_; |
841 | } |
842 | inline void RmwCopy(const Value& old_value, Value& value) { |
843 | value.counter_ = old_value.counter_ + incr_; |
844 | } |
845 | inline bool RmwAtomic(Value& value) { |
846 | value.counter_.fetch_add(incr_); |
847 | return true; |
848 | } |
849 | |
850 | protected: |
851 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
852 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
853 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
854 | } |
855 | |
856 | private: |
857 | Key key_; |
858 | uint64_t incr_; |
859 | }; |
860 | |
861 | class ReadContext : public IAsyncContext { |
862 | public: |
863 | typedef Key key_t; |
864 | typedef Value value_t; |
865 | |
866 | ReadContext(Key key) |
867 | : key_{ key } { |
868 | } |
869 | |
870 | /// Copy (and deep-copy) constructor. |
871 | ReadContext(const ReadContext& other) |
872 | : key_{ other.key_ } { |
873 | } |
874 | |
875 | /// The implicit and explicit interfaces require a key() accessor. |
876 | inline const Key& key() const { |
877 | return key_; |
878 | } |
879 | |
880 | inline void Get(const Value& value) { |
881 | counter = value.counter_.load(std::memory_order_acquire); |
882 | } |
883 | inline void GetAtomic(const Value& value) { |
884 | counter = value.counter_.load(); |
885 | } |
886 | |
887 | protected: |
888 | /// The explicit interface requires a DeepCopy_Internal() implementation. |
889 | Status DeepCopy_Internal(IAsyncContext*& context_copy) { |
890 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
891 | } |
892 | |
893 | private: |
894 | Key key_; |
895 | public: |
896 | uint64_t counter; |
897 | }; |
898 | |
899 | static constexpr size_t kNumRecords = 150000; |
900 | static constexpr size_t kNumThreads = 2; |
901 | |
902 | auto rmw_worker = [](FasterKv<Key, Value, disk_t>* store_, uint64_t incr) { |
903 | Guid session_id = store_->StartSession(); |
904 | for(size_t idx = 0; idx < kNumRecords; ++idx) { |
905 | auto callback = [](IAsyncContext* ctxt, Status result) { |
906 | CallbackContext<RmwContext> context{ ctxt }; |
907 | ASSERT_EQ(Status::Ok, result); |
908 | }; |
909 | |
910 | if(idx % 256 == 0) { |
911 | store_->Refresh(); |
912 | } |
913 | |
914 | RmwContext context{ Key{ idx }, incr }; |
915 | Status result = store_->Rmw(context, callback, 1); |
916 | if(result != Status::Ok) { |
917 | ASSERT_EQ(Status::Pending, result); |
918 | } |
919 | } |
920 | bool result = store_->CompletePending(true); |
921 | ASSERT_TRUE(result); |
922 | store_->StopSession(); |
923 | }; |
924 | |
925 | auto read_worker1 = [](FasterKv<Key, Value, disk_t>* store_, size_t thread_idx) { |
926 | Guid session_id = store_->StartSession(); |
927 | for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) { |
928 | auto callback = [](IAsyncContext* ctxt, Status result) { |
929 | CallbackContext<ReadContext> context{ ctxt }; |
930 | ASSERT_EQ(Status::Ok, result); |
931 | ASSERT_EQ(7 * kNumThreads, context->counter); |
932 | }; |
933 | |
934 | if(idx % 256 == 0) { |
935 | store_->Refresh(); |
936 | } |
937 | |
938 | ReadContext context{ Key{ thread_idx* (kNumRecords / kNumThreads) + idx } }; |
939 | Status result = store_->Read(context, callback, 1); |
940 | if(result == Status::Ok) { |
941 | ASSERT_EQ(7 * kNumThreads, context.counter); |
942 | } else { |
943 | ASSERT_EQ(Status::Pending, result); |
944 | } |
945 | } |
946 | bool result = store_->CompletePending(true); |
947 | ASSERT_TRUE(result); |
948 | store_->StopSession(); |
949 | }; |
950 | |
951 | auto read_worker2 = [](FasterKv<Key, Value, disk_t>* store_, size_t thread_idx) { |
952 | Guid session_id = store_->StartSession(); |
953 | for(size_t idx = 0; idx < kNumRecords / kNumThreads; ++idx) { |
954 | auto callback = [](IAsyncContext* ctxt, Status result) { |
955 | CallbackContext<ReadContext> context{ ctxt }; |
956 | ASSERT_EQ(Status::Ok, result); |
957 | ASSERT_EQ(13 * kNumThreads, context->counter); |
958 | }; |
959 | |
960 | if(idx % 256 == 0) { |
961 | store_->Refresh(); |
962 | } |
963 | |
964 | ReadContext context{ Key{ thread_idx* (kNumRecords / kNumThreads) + idx } }; |
965 | Status result = store_->Read(context, callback, 1); |
966 | if(result == Status::Ok) { |
967 | ASSERT_EQ(13 * kNumThreads, context.counter); |
968 | } else { |
969 | ASSERT_EQ(Status::Pending, result); |
970 | } |
971 | } |
972 | bool result = store_->CompletePending(true); |
973 | ASSERT_TRUE(result); |
974 | store_->StopSession(); |
975 | }; |
976 | |
977 | std::experimental::filesystem::create_directories("logs" ); |
978 | |
979 | // 8 pages! |
980 | FasterKv<Key, Value, disk_t> store{ 262144, 268435456, "logs" , 0.5 }; |
981 | |
982 | // Initial RMW. |
983 | std::deque<std::thread> threads{}; |
984 | for(int64_t idx = 0; idx < kNumThreads; ++idx) { |
985 | threads.emplace_back(rmw_worker, &store, 7); |
986 | } |
987 | for(auto& thread : threads) { |
988 | thread.join(); |
989 | } |
990 | |
991 | // Read. |
992 | threads.clear(); |
993 | for(int64_t idx = 0; idx < kNumThreads; ++idx) { |
994 | threads.emplace_back(read_worker1, &store, idx); |
995 | } |
996 | for(auto& thread : threads) { |
997 | thread.join(); |
998 | } |
999 | |
1000 | // Second RMW. |
1001 | threads.clear(); |
1002 | for(int64_t idx = 0; idx < kNumThreads; ++idx) { |
1003 | threads.emplace_back(rmw_worker, &store, 6); |
1004 | } |
1005 | for(auto& thread : threads) { |
1006 | thread.join(); |
1007 | } |
1008 | |
1009 | // Read again. |
1010 | threads.clear(); |
1011 | for(int64_t idx = 0; idx < kNumThreads; ++idx) { |
1012 | threads.emplace_back(read_worker2, &store, idx); |
1013 | } |
1014 | for(auto& thread : threads) { |
1015 | thread.join(); |
1016 | } |
1017 | } |
1018 | |