1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cinttypes>
8#include <cstdint>
9
10#include "hash_bucket.h"
11#include "key_hash.h"
12
13namespace FASTER {
14namespace core {
15
16/// The hash table itself: a sized array of HashBuckets.
17template <class D>
18class InternalHashTable {
19 public:
20 typedef D disk_t;
21 typedef typename D::file_t file_t;
22
23 InternalHashTable()
24 : size_{ 0 }
25 , buckets_{ nullptr }
26 , disk_{ nullptr }
27 , pending_checkpoint_writes_{ 0 }
28 , pending_recover_reads_{ 0 }
29 , checkpoint_pending_{ false }
30 , checkpoint_failed_{ false }
31 , recover_pending_{ false }
32 , recover_failed_{ false } {
33 }
34
35 ~InternalHashTable() {
36 if(buckets_) {
37 aligned_free(buckets_);
38 }
39 }
40
41 inline void Initialize(uint64_t new_size, uint64_t alignment) {
42 assert(new_size < INT32_MAX);
43 assert(Utility::IsPowerOfTwo(new_size));
44 assert(Utility::IsPowerOfTwo(alignment));
45 assert(alignment >= Constants::kCacheLineBytes);
46 if(size_ != new_size) {
47 size_ = new_size;
48 if(buckets_) {
49 aligned_free(buckets_);
50 }
51 buckets_ = reinterpret_cast<HashBucket*>(aligned_alloc(alignment,
52 size_ * sizeof(HashBucket)));
53 }
54 std::memset(buckets_, 0, size_ * sizeof(HashBucket));
55 assert(pending_checkpoint_writes_ == 0);
56 assert(pending_recover_reads_ == 0);
57 assert(checkpoint_pending_ == false);
58 assert(checkpoint_failed_ == false);
59 assert(recover_pending_ == false);
60 assert(recover_failed_ == false);
61 }
62
63 inline void Uninitialize() {
64 if(buckets_) {
65 aligned_free(buckets_);
66 buckets_ = nullptr;
67 }
68 size_ = 0;
69 assert(pending_checkpoint_writes_ == 0);
70 assert(pending_recover_reads_ == 0);
71 assert(checkpoint_pending_ == false);
72 assert(checkpoint_failed_ == false);
73 assert(recover_pending_ == false);
74 assert(recover_failed_ == false);
75 }
76
77 /// Get the bucket specified by the hash.
78 inline const HashBucket& bucket(KeyHash hash) const {
79 return buckets_[hash.idx(size_)];
80 }
81 inline HashBucket& bucket(KeyHash hash) {
82 return buckets_[hash.idx(size_)];
83 }
84
85 /// Get the bucket specified by the index. (Used by checkpoint/recovery.)
86 inline const HashBucket& bucket(uint64_t idx) const {
87 assert(idx < size_);
88 return buckets_[idx];
89 }
90 /// (Used by GC and called by unit tests.)
91 inline HashBucket& bucket(uint64_t idx) {
92 assert(idx < size_);
93 return buckets_[idx];
94 }
95
96 inline uint64_t size() const {
97 return size_;
98 }
99
100 // Checkpointing and recovery.
101 Status Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size);
102 inline Status CheckpointComplete(bool wait);
103
104 Status Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size);
105 inline Status RecoverComplete(bool wait);
106
107 void DumpDistribution(MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator);
108
109 private:
110 // Checkpointing and recovery.
111 class AsyncIoContext : public IAsyncContext {
112 public:
113 AsyncIoContext(InternalHashTable* table_)
114 : table{ table_ } {
115 }
116 /// The deep-copy constructor
117 AsyncIoContext(AsyncIoContext& other)
118 : table{ other.table } {
119 }
120 protected:
121 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
122 return IAsyncContext::DeepCopy_Internal(*this, context_copy);
123 }
124 public:
125 InternalHashTable* table;
126 };
127
128 private:
129 uint64_t size_;
130 HashBucket* buckets_;
131
132 /// State for ongoing checkpoint/recovery.
133 disk_t* disk_;
134 file_t file_;
135 std::atomic<uint64_t> pending_checkpoint_writes_;
136 std::atomic<uint64_t> pending_recover_reads_;
137 std::atomic<bool> checkpoint_pending_;
138 std::atomic<bool> checkpoint_failed_;
139 std::atomic<bool> recover_pending_;
140 std::atomic<bool> recover_failed_;
141};
142
143/// Implementations.
144template <class D>
145Status InternalHashTable<D>::Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size) {
146 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
147 CallbackContext<AsyncIoContext> context{ ctxt };
148 if(result != Status::Ok) {
149 context->table->checkpoint_failed_ = true;
150 }
151 if(--context->table->pending_checkpoint_writes_ == 0) {
152 result = context->table->file_.Close();
153 if(result != Status::Ok) {
154 context->table->checkpoint_failed_ = true;
155 }
156 context->table->checkpoint_pending_ = false;
157 }
158 };
159
160 assert(size_ % Constants::kNumMergeChunks == 0);
161 disk_ = &disk;
162 file_ = std::move(file);
163
164 checkpoint_size = 0;
165 checkpoint_failed_ = false;
166 uint32_t chunk_size = static_cast<uint32_t>(size_ / Constants::kNumMergeChunks);
167 uint32_t write_size = static_cast<uint32_t>(chunk_size * sizeof(HashBucket));
168 assert(write_size % file_.alignment() == 0);
169 assert(!checkpoint_pending_);
170 assert(pending_checkpoint_writes_ == 0);
171 checkpoint_pending_ = true;
172 pending_checkpoint_writes_ = Constants::kNumMergeChunks;
173 for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) {
174 AsyncIoContext context{ this };
175 RETURN_NOT_OK(file_.WriteAsync(&bucket(idx * chunk_size), idx * write_size, write_size,
176 callback, context));
177 }
178 checkpoint_size = size_ * sizeof(HashBucket);
179 return Status::Ok;
180}
181
182template <class D>
183inline Status InternalHashTable<D>::CheckpointComplete(bool wait) {
184 disk_->TryComplete();
185 bool complete = !checkpoint_pending_.load();
186 while(wait && !complete) {
187 disk_->TryComplete();
188 complete = !checkpoint_pending_.load();
189 std::this_thread::yield();
190 }
191 if(!complete) {
192 return Status::Pending;
193 } else {
194 return checkpoint_failed_ ? Status::IOError : Status::Ok;
195 }
196}
197
198template <class D>
199Status InternalHashTable<D>::Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size) {
200 auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
201 CallbackContext<AsyncIoContext> context{ ctxt };
202 if(result != Status::Ok) {
203 context->table->recover_failed_ = true;
204 }
205 if(--context->table->pending_recover_reads_ == 0) {
206 result = context->table->file_.Close();
207 if(result != Status::Ok) {
208 context->table->recover_failed_ = true;
209 }
210 context->table->recover_pending_ = false;
211 }
212 };
213
214 assert(checkpoint_size > 0);
215 assert(checkpoint_size % sizeof(HashBucket) == 0);
216 assert(checkpoint_size % Constants::kNumMergeChunks == 0);
217 disk_ = &disk;
218 file_ = std::move(file);
219
220 recover_failed_ = false;
221 uint32_t read_size = static_cast<uint32_t>(checkpoint_size / Constants::kNumMergeChunks);
222 uint32_t chunk_size = static_cast<uint32_t>(read_size / sizeof(HashBucket));
223 assert(read_size % file_.alignment() == 0);
224
225 Initialize(checkpoint_size / sizeof(HashBucket), file_.alignment());
226 assert(!recover_pending_);
227 assert(pending_recover_reads_.load() == 0);
228 recover_pending_ = true;
229 pending_recover_reads_ = Constants::kNumMergeChunks;
230 for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) {
231 AsyncIoContext context{ this };
232 RETURN_NOT_OK(file_.ReadAsync(idx * read_size, &bucket(idx * chunk_size), read_size,
233 callback, context));
234 }
235 return Status::Ok;
236}
237
238template <class D>
239inline Status InternalHashTable<D>::RecoverComplete(bool wait) {
240 disk_->TryComplete();
241 bool complete = !recover_pending_.load();
242 while(wait && !complete) {
243 disk_->TryComplete();
244 complete = !recover_pending_.load();
245 std::this_thread::yield();
246 }
247 if(!complete) {
248 return Status::Pending;
249 } else {
250 return recover_failed_ ? Status::IOError : Status::Ok;
251 }
252}
253
254template <class D>
255inline void InternalHashTable<D>::DumpDistribution(
256 MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator) {
257 uint64_t table_size = size();
258 uint64_t total_record_count = 0;
259 uint64_t histogram[16] = { 0 };
260 for(uint64_t bucket_idx = 0; bucket_idx < table_size; ++bucket_idx) {
261 const HashBucket* bucket = &buckets_[bucket_idx];
262 uint64_t count = 0;
263 while(bucket) {
264 for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
265 if(!bucket->entries[entry_idx].load().unused()) {
266 ++count;
267 ++total_record_count;
268 }
269 }
270 HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load();
271 if(overflow_entry.unused()) {
272 bucket = nullptr;
273 } else {
274 bucket = &overflow_buckets_allocator.Get(overflow_entry.address());
275 }
276 }
277 if(count < 15) {
278 ++histogram[count];
279 } else {
280 ++histogram[15];
281 }
282 }
283
284 printf("number of hash buckets: %" PRIu64 "\n", table_size);
285 printf("total record count: %" PRIu64 "\n", total_record_count);
286 printf("histogram:\n");
287 for(uint8_t idx = 0; idx < 15; ++idx) {
288 printf("%2u : %" PRIu64 "\n", idx, histogram[idx]);
289 }
290 printf("15+: %" PRIu64 "\n", histogram[15]);
291}
292
293}
294} // namespace FASTER::core
295