1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <atomic> |
7 | #include <cinttypes> |
8 | #include <cstdint> |
9 | |
10 | #include "hash_bucket.h" |
11 | #include "key_hash.h" |
12 | |
13 | namespace FASTER { |
14 | namespace core { |
15 | |
16 | /// The hash table itself: a sized array of HashBuckets. |
17 | template <class D> |
18 | class InternalHashTable { |
19 | public: |
20 | typedef D disk_t; |
21 | typedef typename D::file_t file_t; |
22 | |
23 | InternalHashTable() |
24 | : size_{ 0 } |
25 | , buckets_{ nullptr } |
26 | , disk_{ nullptr } |
27 | , pending_checkpoint_writes_{ 0 } |
28 | , pending_recover_reads_{ 0 } |
29 | , checkpoint_pending_{ false } |
30 | , checkpoint_failed_{ false } |
31 | , recover_pending_{ false } |
32 | , recover_failed_{ false } { |
33 | } |
34 | |
35 | ~InternalHashTable() { |
36 | if(buckets_) { |
37 | aligned_free(buckets_); |
38 | } |
39 | } |
40 | |
41 | inline void Initialize(uint64_t new_size, uint64_t alignment) { |
42 | assert(new_size < INT32_MAX); |
43 | assert(Utility::IsPowerOfTwo(new_size)); |
44 | assert(Utility::IsPowerOfTwo(alignment)); |
45 | assert(alignment >= Constants::kCacheLineBytes); |
46 | if(size_ != new_size) { |
47 | size_ = new_size; |
48 | if(buckets_) { |
49 | aligned_free(buckets_); |
50 | } |
51 | buckets_ = reinterpret_cast<HashBucket*>(aligned_alloc(alignment, |
52 | size_ * sizeof(HashBucket))); |
53 | } |
54 | std::memset(buckets_, 0, size_ * sizeof(HashBucket)); |
55 | assert(pending_checkpoint_writes_ == 0); |
56 | assert(pending_recover_reads_ == 0); |
57 | assert(checkpoint_pending_ == false); |
58 | assert(checkpoint_failed_ == false); |
59 | assert(recover_pending_ == false); |
60 | assert(recover_failed_ == false); |
61 | } |
62 | |
63 | inline void Uninitialize() { |
64 | if(buckets_) { |
65 | aligned_free(buckets_); |
66 | buckets_ = nullptr; |
67 | } |
68 | size_ = 0; |
69 | assert(pending_checkpoint_writes_ == 0); |
70 | assert(pending_recover_reads_ == 0); |
71 | assert(checkpoint_pending_ == false); |
72 | assert(checkpoint_failed_ == false); |
73 | assert(recover_pending_ == false); |
74 | assert(recover_failed_ == false); |
75 | } |
76 | |
77 | /// Get the bucket specified by the hash. |
78 | inline const HashBucket& bucket(KeyHash hash) const { |
79 | return buckets_[hash.idx(size_)]; |
80 | } |
81 | inline HashBucket& bucket(KeyHash hash) { |
82 | return buckets_[hash.idx(size_)]; |
83 | } |
84 | |
85 | /// Get the bucket specified by the index. (Used by checkpoint/recovery.) |
86 | inline const HashBucket& bucket(uint64_t idx) const { |
87 | assert(idx < size_); |
88 | return buckets_[idx]; |
89 | } |
90 | /// (Used by GC and called by unit tests.) |
91 | inline HashBucket& bucket(uint64_t idx) { |
92 | assert(idx < size_); |
93 | return buckets_[idx]; |
94 | } |
95 | |
96 | inline uint64_t size() const { |
97 | return size_; |
98 | } |
99 | |
100 | // Checkpointing and recovery. |
101 | Status Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size); |
102 | inline Status CheckpointComplete(bool wait); |
103 | |
104 | Status Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size); |
105 | inline Status RecoverComplete(bool wait); |
106 | |
107 | void DumpDistribution(MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator); |
108 | |
109 | private: |
110 | // Checkpointing and recovery. |
111 | class AsyncIoContext : public IAsyncContext { |
112 | public: |
113 | AsyncIoContext(InternalHashTable* table_) |
114 | : table{ table_ } { |
115 | } |
116 | /// The deep-copy constructor |
117 | AsyncIoContext(AsyncIoContext& other) |
118 | : table{ other.table } { |
119 | } |
120 | protected: |
121 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
122 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
123 | } |
124 | public: |
125 | InternalHashTable* table; |
126 | }; |
127 | |
128 | private: |
129 | uint64_t size_; |
130 | HashBucket* buckets_; |
131 | |
132 | /// State for ongoing checkpoint/recovery. |
133 | disk_t* disk_; |
134 | file_t file_; |
135 | std::atomic<uint64_t> pending_checkpoint_writes_; |
136 | std::atomic<uint64_t> pending_recover_reads_; |
137 | std::atomic<bool> checkpoint_pending_; |
138 | std::atomic<bool> checkpoint_failed_; |
139 | std::atomic<bool> recover_pending_; |
140 | std::atomic<bool> recover_failed_; |
141 | }; |
142 | |
143 | /// Implementations. |
144 | template <class D> |
145 | Status InternalHashTable<D>::Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size) { |
146 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
147 | CallbackContext<AsyncIoContext> context{ ctxt }; |
148 | if(result != Status::Ok) { |
149 | context->table->checkpoint_failed_ = true; |
150 | } |
151 | if(--context->table->pending_checkpoint_writes_ == 0) { |
152 | result = context->table->file_.Close(); |
153 | if(result != Status::Ok) { |
154 | context->table->checkpoint_failed_ = true; |
155 | } |
156 | context->table->checkpoint_pending_ = false; |
157 | } |
158 | }; |
159 | |
160 | assert(size_ % Constants::kNumMergeChunks == 0); |
161 | disk_ = &disk; |
162 | file_ = std::move(file); |
163 | |
164 | checkpoint_size = 0; |
165 | checkpoint_failed_ = false; |
166 | uint32_t chunk_size = static_cast<uint32_t>(size_ / Constants::kNumMergeChunks); |
167 | uint32_t write_size = static_cast<uint32_t>(chunk_size * sizeof(HashBucket)); |
168 | assert(write_size % file_.alignment() == 0); |
169 | assert(!checkpoint_pending_); |
170 | assert(pending_checkpoint_writes_ == 0); |
171 | checkpoint_pending_ = true; |
172 | pending_checkpoint_writes_ = Constants::kNumMergeChunks; |
173 | for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) { |
174 | AsyncIoContext context{ this }; |
175 | RETURN_NOT_OK(file_.WriteAsync(&bucket(idx * chunk_size), idx * write_size, write_size, |
176 | callback, context)); |
177 | } |
178 | checkpoint_size = size_ * sizeof(HashBucket); |
179 | return Status::Ok; |
180 | } |
181 | |
182 | template <class D> |
183 | inline Status InternalHashTable<D>::CheckpointComplete(bool wait) { |
184 | disk_->TryComplete(); |
185 | bool complete = !checkpoint_pending_.load(); |
186 | while(wait && !complete) { |
187 | disk_->TryComplete(); |
188 | complete = !checkpoint_pending_.load(); |
189 | std::this_thread::yield(); |
190 | } |
191 | if(!complete) { |
192 | return Status::Pending; |
193 | } else { |
194 | return checkpoint_failed_ ? Status::IOError : Status::Ok; |
195 | } |
196 | } |
197 | |
198 | template <class D> |
199 | Status InternalHashTable<D>::Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size) { |
200 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
201 | CallbackContext<AsyncIoContext> context{ ctxt }; |
202 | if(result != Status::Ok) { |
203 | context->table->recover_failed_ = true; |
204 | } |
205 | if(--context->table->pending_recover_reads_ == 0) { |
206 | result = context->table->file_.Close(); |
207 | if(result != Status::Ok) { |
208 | context->table->recover_failed_ = true; |
209 | } |
210 | context->table->recover_pending_ = false; |
211 | } |
212 | }; |
213 | |
214 | assert(checkpoint_size > 0); |
215 | assert(checkpoint_size % sizeof(HashBucket) == 0); |
216 | assert(checkpoint_size % Constants::kNumMergeChunks == 0); |
217 | disk_ = &disk; |
218 | file_ = std::move(file); |
219 | |
220 | recover_failed_ = false; |
221 | uint32_t read_size = static_cast<uint32_t>(checkpoint_size / Constants::kNumMergeChunks); |
222 | uint32_t chunk_size = static_cast<uint32_t>(read_size / sizeof(HashBucket)); |
223 | assert(read_size % file_.alignment() == 0); |
224 | |
225 | Initialize(checkpoint_size / sizeof(HashBucket), file_.alignment()); |
226 | assert(!recover_pending_); |
227 | assert(pending_recover_reads_.load() == 0); |
228 | recover_pending_ = true; |
229 | pending_recover_reads_ = Constants::kNumMergeChunks; |
230 | for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) { |
231 | AsyncIoContext context{ this }; |
232 | RETURN_NOT_OK(file_.ReadAsync(idx * read_size, &bucket(idx * chunk_size), read_size, |
233 | callback, context)); |
234 | } |
235 | return Status::Ok; |
236 | } |
237 | |
238 | template <class D> |
239 | inline Status InternalHashTable<D>::RecoverComplete(bool wait) { |
240 | disk_->TryComplete(); |
241 | bool complete = !recover_pending_.load(); |
242 | while(wait && !complete) { |
243 | disk_->TryComplete(); |
244 | complete = !recover_pending_.load(); |
245 | std::this_thread::yield(); |
246 | } |
247 | if(!complete) { |
248 | return Status::Pending; |
249 | } else { |
250 | return recover_failed_ ? Status::IOError : Status::Ok; |
251 | } |
252 | } |
253 | |
254 | template <class D> |
255 | inline void InternalHashTable<D>::DumpDistribution( |
256 | MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator) { |
257 | uint64_t table_size = size(); |
258 | uint64_t total_record_count = 0; |
259 | uint64_t histogram[16] = { 0 }; |
260 | for(uint64_t bucket_idx = 0; bucket_idx < table_size; ++bucket_idx) { |
261 | const HashBucket* bucket = &buckets_[bucket_idx]; |
262 | uint64_t count = 0; |
263 | while(bucket) { |
264 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
265 | if(!bucket->entries[entry_idx].load().unused()) { |
266 | ++count; |
267 | ++total_record_count; |
268 | } |
269 | } |
270 | HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load(); |
271 | if(overflow_entry.unused()) { |
272 | bucket = nullptr; |
273 | } else { |
274 | bucket = &overflow_buckets_allocator.Get(overflow_entry.address()); |
275 | } |
276 | } |
277 | if(count < 15) { |
278 | ++histogram[count]; |
279 | } else { |
280 | ++histogram[15]; |
281 | } |
282 | } |
283 | |
284 | printf("number of hash buckets: %" PRIu64 "\n" , table_size); |
285 | printf("total record count: %" PRIu64 "\n" , total_record_count); |
286 | printf("histogram:\n" ); |
287 | for(uint8_t idx = 0; idx < 15; ++idx) { |
288 | printf("%2u : %" PRIu64 "\n" , idx, histogram[idx]); |
289 | } |
290 | printf("15+: %" PRIu64 "\n" , histogram[15]); |
291 | } |
292 | |
293 | } |
294 | } // namespace FASTER::core |
295 | |