1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#include <atomic>
5#include <cinttypes>
6#include <cstdint>
7#include <cstdio>
8#include <cstdlib>
9#include <random>
10#include <string>
11
12#include "file.h"
13
14#include "core/auto_ptr.h"
15#include "core/faster.h"
16#include "device/null_disk.h"
17
18using namespace std::chrono_literals;
19using namespace FASTER::core;
20
21/// Basic YCSB benchmark.
22
23enum class Op : uint8_t {
24 Insert = 0,
25 Read = 1,
26 Upsert = 2,
27 Scan = 3,
28 ReadModifyWrite = 4,
29};
30
31enum class Workload {
32 A_50_50 = 0,
33 RMW_100 = 1,
34};
35
36static constexpr uint64_t kInitCount = 250000000;
37static constexpr uint64_t kTxnCount = 1000000000;
38static constexpr uint64_t kChunkSize = 3200;
39static constexpr uint64_t kRefreshInterval = 64;
40static constexpr uint64_t kCompletePendingInterval = 1600;
41
42static_assert(kInitCount % kChunkSize == 0, "kInitCount % kChunkSize != 0");
43static_assert(kTxnCount % kChunkSize == 0, "kTxnCount % kChunkSize != 0");
44static_assert(kCompletePendingInterval % kRefreshInterval == 0,
45 "kCompletePendingInterval % kRefreshInterval != 0");
46
47static constexpr uint64_t kNanosPerSecond = 1000000000;
48
49static constexpr uint64_t kMaxKey = 268435456;
50static constexpr uint64_t kRunSeconds = 360;
51static constexpr uint64_t kCheckpointSeconds = 30;
52
53aligned_unique_ptr_t<uint64_t> init_keys_;
54aligned_unique_ptr_t<uint64_t> txn_keys_;
55std::atomic<uint64_t> idx_{ 0 };
56std::atomic<bool> done_{ false };
57std::atomic<uint64_t> total_duration_{ 0 };
58std::atomic<uint64_t> total_reads_done_{ 0 };
59std::atomic<uint64_t> total_writes_done_{ 0 };
60
61class ReadContext;
62class UpsertContext;
63class RmwContext;
64
65/// This benchmark stores 8-byte keys in key-value store.
66class Key {
67 public:
68 Key(uint64_t key)
69 : key_{ key } {
70 }
71
72 /// Methods and operators required by the (implicit) interface:
73 inline static constexpr uint32_t size() {
74 return static_cast<uint32_t>(sizeof(Key));
75 }
76 inline KeyHash GetHash() const {
77 return KeyHash{ Utility::GetHashCode(key_) };
78 }
79
80 /// Comparison operators.
81 inline bool operator==(const Key& other) const {
82 return key_ == other.key_;
83 }
84 inline bool operator!=(const Key& other) const {
85 return key_ != other.key_;
86 }
87
88 private:
89 uint64_t key_;
90};
91
92/// This benchmark stores an 8-byte value in the key-value store.
93class Value {
94 public:
95 Value()
96 : value_{ 0 } {
97 }
98
99 Value(const Value& other)
100 : value_{ other.value_ } {
101 }
102
103 Value(uint64_t value)
104 : value_{ value } {
105 }
106
107 inline static constexpr uint32_t size() {
108 return static_cast<uint32_t>(sizeof(Value));
109 }
110
111 friend class ReadContext;
112 friend class UpsertContext;
113 friend class RmwContext;
114
115 private:
116 union {
117 uint64_t value_;
118 std::atomic<uint64_t> atomic_value_;
119 };
120};
121
122/// Class passed to store_t::Read().
123class ReadContext : public IAsyncContext {
124 public:
125 typedef Key key_t;
126 typedef Value value_t;
127
128 ReadContext(uint64_t key)
129 : key_{ key } {
130 }
131
132 /// Copy (and deep-copy) constructor.
133 ReadContext(const ReadContext& other)
134 : key_{ other.key_ } {
135 }
136
137 /// The implicit and explicit interfaces require a key() accessor.
138 inline const Key& key() const {
139 return key_;
140 }
141
142 // For this benchmark, we don't copy out, so these are no-ops.
143 inline void Get(const value_t& value) { }
144 inline void GetAtomic(const value_t& value) { }
145
146 protected:
147 /// The explicit interface requires a DeepCopy_Internal() implementation.
148 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
149 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
150 }
151
152 private:
153 Key key_;
154};
155
156/// Class passed to store_t::Upsert().
157class UpsertContext : public IAsyncContext {
158 public:
159 typedef Key key_t;
160 typedef Value value_t;
161
162 UpsertContext(uint64_t key, uint64_t input)
163 : key_{ key }
164 , input_{ input } {
165 }
166
167 /// Copy (and deep-copy) constructor.
168 UpsertContext(const UpsertContext& other)
169 : key_{ other.key_ }
170 , input_{ other.input_ } {
171 }
172
173 /// The implicit and explicit interfaces require a key() accessor.
174 inline const Key& key() const {
175 return key_;
176 }
177 inline static constexpr uint32_t value_size() {
178 return sizeof(value_t);
179 }
180
181 /// Non-atomic and atomic Put() methods.
182 inline void Put(value_t& value) {
183 value.value_ = input_;
184 }
185 inline bool PutAtomic(value_t& value) {
186 value.atomic_value_.store(input_);
187 return true;
188 }
189
190 protected:
191 /// The explicit interface requires a DeepCopy_Internal() implementation.
192 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
193 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
194 }
195
196 private:
197 Key key_;
198 uint64_t input_;
199};
200
201/// Class passed to store_t::RMW().
202class RmwContext : public IAsyncContext {
203 public:
204 typedef Key key_t;
205 typedef Value value_t;
206
207 RmwContext(uint64_t key, uint64_t incr)
208 : key_{ key }
209 , incr_{ incr } {
210 }
211
212 /// Copy (and deep-copy) constructor.
213 RmwContext(const RmwContext& other)
214 : key_{ other.key_ }
215 , incr_{ other.incr_ } {
216 }
217
218 /// The implicit and explicit interfaces require a key() accessor.
219 const Key& key() const {
220 return key_;
221 }
222 inline static constexpr uint32_t value_size() {
223 return sizeof(value_t);
224 }
225
226 /// Initial, non-atomic, and atomic RMW methods.
227 inline void RmwInitial(value_t& value) {
228 value.value_ = incr_;
229 }
230 inline void RmwCopy(const value_t& old_value, value_t& value) {
231 value.value_ = old_value.value_ + incr_;
232 }
233 inline bool RmwAtomic(value_t& value) {
234 value.atomic_value_.fetch_add(incr_);
235 return true;
236 }
237
238 protected:
239 /// The explicit interface requires a DeepCopy_Internal() implementation.
240 Status DeepCopy_Internal(IAsyncContext*& context_copy) {
241 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
242 }
243
244 private:
245 Key key_;
246 uint64_t incr_;
247};
248
249/// Key-value store, specialized to our key and value types.
250#ifdef _WIN32
251typedef FASTER::environment::ThreadPoolIoHandler handler_t;
252#else
253typedef FASTER::environment::QueueIoHandler handler_t;
254#endif
255typedef FASTER::device::FileSystemDisk<handler_t, 1073741824ull> disk_t;
256using store_t = FasterKv<Key, Value, disk_t>;
257
258inline Op ycsb_a_50_50(std::mt19937& rng) {
259 if(rng() % 100 < 50) {
260 return Op::Read;
261 } else {
262 return Op::Upsert;
263 }
264}
265
266inline Op ycsb_rmw_100(std::mt19937& rng) {
267 return Op::ReadModifyWrite;
268}
269
270/// Affinitize to hardware threads on the same core first, before
271/// moving on to the next core.
272void SetThreadAffinity(size_t core) {
273
274 // For now, assume 36 cores. (Set this correctly for your test system.)
275 constexpr size_t kCoreCount = 36;
276#ifdef _WIN32
277 HANDLE thread_handle = ::GetCurrentThread();
278 GROUP_AFFINITY group;
279 group.Group = WORD(core / kCoreCount);
280 group.Mask = KAFFINITY(0x1llu << (core - kCoreCount * group.Group));
281 ::SetThreadGroupAffinity(thread_handle, &group, nullptr);
282#else
283 // On our 28-core test system, we see CPU 0, Core 0 assigned to 0, 28;
284 // CPU 1, Core 0 assigned to 1, 29; etc.
285 cpu_set_t mask;
286 CPU_ZERO(&mask);
287#ifdef NUMA
288 switch(core % 4) {
289 case 0:
290 // 0 |-> 0
291 // 4 |-> 2
292 // 8 |-> 4
293 core = core / 2;
294 break;
295 case 1:
296 // 1 |-> 28
297 // 5 |-> 30
298 // 9 |-> 32
299 core = kCoreCount + (core - 1) / 2;
300 break;
301 case 2:
302 // 2 |-> 1
303 // 6 |-> 3
304 // 10 |-> 5
305 core = core / 2;
306 break;
307 case 3:
308 // 3 |-> 29
309 // 7 |-> 31
310 // 11 |-> 33
311 core = kCoreCount + (core - 1) / 2;
312 break;
313 }
314#else
315 switch(core % 2) {
316 case 0:
317 // 0 |-> 0
318 // 2 |-> 2
319 // 4 |-> 4
320 core = core;
321 break;
322 case 1:
323 // 1 |-> 28
324 // 3 |-> 30
325 // 5 |-> 32
326 core = (core - 1) + kCoreCount;
327 break;
328 }
329#endif
330 CPU_SET(core, &mask);
331
332 ::sched_setaffinity(0, sizeof(mask), &mask);
333#endif
334}
335
336void load_files(const std::string& load_filename, const std::string& run_filename) {
337 constexpr size_t kFileChunkSize = 131072;
338
339 auto chunk_guard = alloc_aligned<uint64_t>(512, kFileChunkSize);
340 uint64_t* chunk = chunk_guard.get();
341
342 FASTER::benchmark::File init_file{ load_filename };
343
344 printf("loading keys from %s into memory...\n", load_filename.c_str());
345
346 init_keys_ = alloc_aligned<uint64_t>(64, kInitCount * sizeof(uint64_t));
347 uint64_t count = 0;
348
349 uint64_t offset = 0;
350 while(true) {
351 uint64_t size = init_file.Read(chunk, kFileChunkSize, offset);
352 for(uint64_t idx = 0; idx < size / 8; ++idx) {
353 init_keys_.get()[count] = chunk[idx];
354 ++count;
355 }
356 if(size == kFileChunkSize) {
357 offset += kFileChunkSize;
358 } else {
359 break;
360 }
361 }
362 if(kInitCount != count) {
363 printf("Init file load fail!\n");
364 exit(1);
365 }
366
367 printf("loaded %" PRIu64 " keys.\n", count);
368
369 FASTER::benchmark::File txn_file{ run_filename };
370
371 printf("loading txns from %s into memory...\n", run_filename.c_str());
372
373 txn_keys_ = alloc_aligned<uint64_t>(64, kTxnCount * sizeof(uint64_t));
374
375 count = 0;
376 offset = 0;
377
378 while(true) {
379 uint64_t size = txn_file.Read(chunk, kFileChunkSize, offset);
380 for(uint64_t idx = 0; idx < size / 8; ++idx) {
381 txn_keys_.get()[count] = chunk[idx];
382 ++count;
383 }
384 if(size == kFileChunkSize) {
385 offset += kFileChunkSize;
386 } else {
387 break;
388 }
389 }
390 if(kTxnCount != count) {
391 printf("Txn file load fail!\n");
392 exit(1);
393 }
394 printf("loaded %" PRIu64 " txns.\n", count);
395}
396
397void thread_setup_store(store_t* store, size_t thread_idx) {
398 auto callback = [](IAsyncContext* ctxt, Status result) {
399 assert(result == Status::Ok);
400 };
401
402 SetThreadAffinity(thread_idx);
403
404 Guid guid = store->StartSession();
405
406 uint64_t value = 42;
407 for(uint64_t chunk_idx = idx_.fetch_add(kChunkSize); chunk_idx < kInitCount;
408 chunk_idx = idx_.fetch_add(kChunkSize)) {
409 for(uint64_t idx = chunk_idx; idx < chunk_idx + kChunkSize; ++idx) {
410 if(idx % kRefreshInterval == 0) {
411 store->Refresh();
412 if(idx % kCompletePendingInterval == 0) {
413 store->CompletePending(false);
414 }
415 }
416
417 UpsertContext context{ init_keys_.get()[idx], value };
418 store->Upsert(context, callback, 1);
419 }
420 }
421
422 store->CompletePending(true);
423 store->StopSession();
424}
425
426void setup_store(store_t* store, size_t num_threads) {
427 idx_ = 0;
428 std::deque<std::thread> threads;
429 for(size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
430 threads.emplace_back(&thread_setup_store, store, thread_idx);
431 }
432 for(auto& thread : threads) {
433 thread.join();
434 }
435
436 init_keys_.reset();
437
438 printf("Finished populating store: contains ?? elements.\n");
439}
440
441
442static std::atomic<int64_t> async_reads_done{ 0 };
443static std::atomic<int64_t> async_writes_done{ 0 };
444
445template <Op(*FN)(std::mt19937&)>
446void thread_run_benchmark(store_t* store, size_t thread_idx) {
447 SetThreadAffinity(thread_idx);
448
449 std::random_device rd{};
450 std::mt19937 rng{ rd() };
451
452 auto start_time = std::chrono::high_resolution_clock::now();
453
454 uint64_t upsert_value = 0;
455 int64_t reads_done = 0;
456 int64_t writes_done = 0;
457
458 Guid guid = store->StartSession();
459
460 while(!done_) {
461 uint64_t chunk_idx = idx_.fetch_add(kChunkSize);
462 while(chunk_idx >= kTxnCount) {
463 if(chunk_idx == kTxnCount) {
464 idx_ = 0;
465 }
466 chunk_idx = idx_.fetch_add(kChunkSize);
467 }
468 for(uint64_t idx = chunk_idx; idx < chunk_idx + kChunkSize; ++idx) {
469 if(idx % kRefreshInterval == 0) {
470 store->Refresh();
471 if(idx % kCompletePendingInterval == 0) {
472 store->CompletePending(false);
473 }
474 }
475 switch(FN(rng)) {
476 case Op::Insert:
477 case Op::Upsert: {
478 auto callback = [](IAsyncContext* ctxt, Status result) {
479 CallbackContext<UpsertContext> context{ ctxt };
480 };
481
482 UpsertContext context{ txn_keys_.get()[idx], upsert_value };
483 Status result = store->Upsert(context, callback, 1);
484 ++writes_done;
485 break;
486 }
487 case Op::Scan:
488 printf("Scan currently not supported!\n");
489 exit(1);
490 break;
491 case Op::Read: {
492 auto callback = [](IAsyncContext* ctxt, Status result) {
493 CallbackContext<ReadContext> context{ ctxt };
494 };
495
496 ReadContext context{ txn_keys_.get()[idx] };
497
498 Status result = store->Read(context, callback, 1);
499 ++reads_done;
500 break;
501 }
502 case Op::ReadModifyWrite:
503 auto callback = [](IAsyncContext* ctxt, Status result) {
504 CallbackContext<RmwContext> context{ ctxt };
505 };
506
507 RmwContext context{ txn_keys_.get()[idx], 5 };
508 Status result = store->Rmw(context, callback, 1);
509 if(result == Status::Ok) {
510 ++writes_done;
511 }
512 break;
513 }
514 }
515 }
516
517 store->CompletePending(true);
518 store->StopSession();
519
520 auto end_time = std::chrono::high_resolution_clock::now();
521 std::chrono::nanoseconds duration = end_time - start_time;
522 total_duration_ += duration.count();
523 total_reads_done_ += reads_done;
524 total_writes_done_ += writes_done;
525 printf("Finished thread %" PRIu64 " : %" PRIu64 " reads, %" PRIu64 " writes, in %.2f seconds.\n",
526 thread_idx, reads_done, writes_done, (double)duration.count() / kNanosPerSecond);
527}
528
529template <Op(*FN)(std::mt19937&)>
530void run_benchmark(store_t* store, size_t num_threads) {
531 idx_ = 0;
532 total_duration_ = 0;
533 total_reads_done_ = 0;
534 total_writes_done_ = 0;
535 done_ = false;
536 std::deque<std::thread> threads;
537 for(size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
538 threads.emplace_back(&thread_run_benchmark<FN>, store, thread_idx);
539 }
540
541 static std::atomic<uint64_t> num_checkpoints;
542 num_checkpoints = 0;
543
544 if(kCheckpointSeconds == 0) {
545 std::this_thread::sleep_for(std::chrono::seconds(kRunSeconds));
546 } else {
547 auto callback = [](Status result, uint64_t persistent_serial_num) {
548 if(result != Status::Ok) {
549 printf("Thread %" PRIu32 " reports checkpoint failed.\n",
550 Thread::id());
551 } else {
552 ++num_checkpoints;
553 }
554 };
555
556 auto start_time = std::chrono::high_resolution_clock::now();
557 auto last_checkpoint_time = start_time;
558 auto current_time = start_time;
559
560 uint64_t checkpoint_num = 0;
561
562 while(current_time - start_time < std::chrono::seconds(kRunSeconds)) {
563 std::this_thread::sleep_for(std::chrono::seconds(1));
564 current_time = std::chrono::high_resolution_clock::now();
565 if(current_time - last_checkpoint_time >= std::chrono::seconds(kCheckpointSeconds)) {
566 Guid token;
567 bool success = store->Checkpoint(nullptr, callback, token);
568 if(success) {
569 printf("Starting checkpoint %" PRIu64 ".\n", checkpoint_num);
570 ++checkpoint_num;
571 } else {
572 printf("Failed to start checkpoint.\n");
573 }
574 last_checkpoint_time = current_time;
575 }
576 }
577
578 done_ = true;
579 }
580
581 for(auto& thread : threads) {
582 thread.join();
583 }
584
585 printf("Finished benchmark: %" PRIu64 " thread checkpoints completed; %.2f ops/second/thread\n",
586 num_checkpoints.load(),
587 ((double)total_reads_done_ + (double)total_writes_done_) / ((double)total_duration_ /
588 kNanosPerSecond));
589}
590
591void run(Workload workload, size_t num_threads) {
592 // FASTER store has a hash table with approx. kInitCount / 2 entries and a log of size 16 GB
593 size_t init_size = next_power_of_two(kInitCount / 2);
594 store_t store{ init_size, 17179869184, "storage" };
595
596 printf("Populating the store...\n");
597
598 setup_store(&store, num_threads);
599
600 store.DumpDistribution();
601
602 printf("Running benchmark on %" PRIu64 " threads...\n", num_threads);
603 switch(workload) {
604 case Workload::A_50_50:
605 run_benchmark<ycsb_a_50_50>(&store, num_threads);
606 break;
607 case Workload::RMW_100:
608 run_benchmark<ycsb_rmw_100>(&store, num_threads);
609 break;
610 default:
611 printf("Unknown workload!\n");
612 exit(1);
613 }
614}
615
616int main(int argc, char* argv[]) {
617 constexpr size_t kNumArgs = 4;
618 if(argc != kNumArgs + 1) {
619 printf("Usage: benchmark.exe <workload> <# threads> <load_filename> <run_filename>\n");
620 exit(0);
621 }
622
623 Workload workload = static_cast<Workload>(std::atol(argv[1]));
624 size_t num_threads = ::atol(argv[2]);
625 std::string load_filename{ argv[3] };
626 std::string run_filename{ argv[4] };
627
628 load_files(load_filename, run_filename);
629
630 run(workload, num_threads);
631
632 return 0;
633}
634