| 1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | // Licensed under the MIT license. |
| 3 | |
| 4 | #pragma once |
| 5 | |
| 6 | #include <atomic> |
| 7 | #include <cinttypes> |
| 8 | #include <cstdint> |
| 9 | |
| 10 | #include "hash_bucket.h" |
| 11 | #include "key_hash.h" |
| 12 | |
| 13 | namespace FASTER { |
| 14 | namespace core { |
| 15 | |
| 16 | /// The hash table itself: a sized array of HashBuckets. |
| 17 | template <class D> |
| 18 | class InternalHashTable { |
| 19 | public: |
| 20 | typedef D disk_t; |
| 21 | typedef typename D::file_t file_t; |
| 22 | |
| 23 | InternalHashTable() |
| 24 | : size_{ 0 } |
| 25 | , buckets_{ nullptr } |
| 26 | , disk_{ nullptr } |
| 27 | , pending_checkpoint_writes_{ 0 } |
| 28 | , pending_recover_reads_{ 0 } |
| 29 | , checkpoint_pending_{ false } |
| 30 | , checkpoint_failed_{ false } |
| 31 | , recover_pending_{ false } |
| 32 | , recover_failed_{ false } { |
| 33 | } |
| 34 | |
| 35 | ~InternalHashTable() { |
| 36 | if(buckets_) { |
| 37 | aligned_free(buckets_); |
| 38 | } |
| 39 | } |
| 40 | |
| 41 | inline void Initialize(uint64_t new_size, uint64_t alignment) { |
| 42 | assert(new_size < INT32_MAX); |
| 43 | assert(Utility::IsPowerOfTwo(new_size)); |
| 44 | assert(Utility::IsPowerOfTwo(alignment)); |
| 45 | assert(alignment >= Constants::kCacheLineBytes); |
| 46 | if(size_ != new_size) { |
| 47 | size_ = new_size; |
| 48 | if(buckets_) { |
| 49 | aligned_free(buckets_); |
| 50 | } |
| 51 | buckets_ = reinterpret_cast<HashBucket*>(aligned_alloc(alignment, |
| 52 | size_ * sizeof(HashBucket))); |
| 53 | } |
| 54 | std::memset(buckets_, 0, size_ * sizeof(HashBucket)); |
| 55 | assert(pending_checkpoint_writes_ == 0); |
| 56 | assert(pending_recover_reads_ == 0); |
| 57 | assert(checkpoint_pending_ == false); |
| 58 | assert(checkpoint_failed_ == false); |
| 59 | assert(recover_pending_ == false); |
| 60 | assert(recover_failed_ == false); |
| 61 | } |
| 62 | |
| 63 | inline void Uninitialize() { |
| 64 | if(buckets_) { |
| 65 | aligned_free(buckets_); |
| 66 | buckets_ = nullptr; |
| 67 | } |
| 68 | size_ = 0; |
| 69 | assert(pending_checkpoint_writes_ == 0); |
| 70 | assert(pending_recover_reads_ == 0); |
| 71 | assert(checkpoint_pending_ == false); |
| 72 | assert(checkpoint_failed_ == false); |
| 73 | assert(recover_pending_ == false); |
| 74 | assert(recover_failed_ == false); |
| 75 | } |
| 76 | |
| 77 | /// Get the bucket specified by the hash. |
| 78 | inline const HashBucket& bucket(KeyHash hash) const { |
| 79 | return buckets_[hash.idx(size_)]; |
| 80 | } |
| 81 | inline HashBucket& bucket(KeyHash hash) { |
| 82 | return buckets_[hash.idx(size_)]; |
| 83 | } |
| 84 | |
| 85 | /// Get the bucket specified by the index. (Used by checkpoint/recovery.) |
| 86 | inline const HashBucket& bucket(uint64_t idx) const { |
| 87 | assert(idx < size_); |
| 88 | return buckets_[idx]; |
| 89 | } |
| 90 | /// (Used by GC and called by unit tests.) |
| 91 | inline HashBucket& bucket(uint64_t idx) { |
| 92 | assert(idx < size_); |
| 93 | return buckets_[idx]; |
| 94 | } |
| 95 | |
| 96 | inline uint64_t size() const { |
| 97 | return size_; |
| 98 | } |
| 99 | |
| 100 | // Checkpointing and recovery. |
| 101 | Status Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size); |
| 102 | inline Status CheckpointComplete(bool wait); |
| 103 | |
| 104 | Status Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size); |
| 105 | inline Status RecoverComplete(bool wait); |
| 106 | |
| 107 | void DumpDistribution(MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator); |
| 108 | |
| 109 | private: |
| 110 | // Checkpointing and recovery. |
| 111 | class AsyncIoContext : public IAsyncContext { |
| 112 | public: |
| 113 | AsyncIoContext(InternalHashTable* table_) |
| 114 | : table{ table_ } { |
| 115 | } |
| 116 | /// The deep-copy constructor |
| 117 | AsyncIoContext(AsyncIoContext& other) |
| 118 | : table{ other.table } { |
| 119 | } |
| 120 | protected: |
| 121 | Status DeepCopy_Internal(IAsyncContext*& context_copy) final { |
| 122 | return IAsyncContext::DeepCopy_Internal(*this, context_copy); |
| 123 | } |
| 124 | public: |
| 125 | InternalHashTable* table; |
| 126 | }; |
| 127 | |
| 128 | private: |
| 129 | uint64_t size_; |
| 130 | HashBucket* buckets_; |
| 131 | |
| 132 | /// State for ongoing checkpoint/recovery. |
| 133 | disk_t* disk_; |
| 134 | file_t file_; |
| 135 | std::atomic<uint64_t> pending_checkpoint_writes_; |
| 136 | std::atomic<uint64_t> pending_recover_reads_; |
| 137 | std::atomic<bool> checkpoint_pending_; |
| 138 | std::atomic<bool> checkpoint_failed_; |
| 139 | std::atomic<bool> recover_pending_; |
| 140 | std::atomic<bool> recover_failed_; |
| 141 | }; |
| 142 | |
| 143 | /// Implementations. |
| 144 | template <class D> |
| 145 | Status InternalHashTable<D>::Checkpoint(disk_t& disk, file_t&& file, uint64_t& checkpoint_size) { |
| 146 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
| 147 | CallbackContext<AsyncIoContext> context{ ctxt }; |
| 148 | if(result != Status::Ok) { |
| 149 | context->table->checkpoint_failed_ = true; |
| 150 | } |
| 151 | if(--context->table->pending_checkpoint_writes_ == 0) { |
| 152 | result = context->table->file_.Close(); |
| 153 | if(result != Status::Ok) { |
| 154 | context->table->checkpoint_failed_ = true; |
| 155 | } |
| 156 | context->table->checkpoint_pending_ = false; |
| 157 | } |
| 158 | }; |
| 159 | |
| 160 | assert(size_ % Constants::kNumMergeChunks == 0); |
| 161 | disk_ = &disk; |
| 162 | file_ = std::move(file); |
| 163 | |
| 164 | checkpoint_size = 0; |
| 165 | checkpoint_failed_ = false; |
| 166 | uint32_t chunk_size = static_cast<uint32_t>(size_ / Constants::kNumMergeChunks); |
| 167 | uint32_t write_size = static_cast<uint32_t>(chunk_size * sizeof(HashBucket)); |
| 168 | assert(write_size % file_.alignment() == 0); |
| 169 | assert(!checkpoint_pending_); |
| 170 | assert(pending_checkpoint_writes_ == 0); |
| 171 | checkpoint_pending_ = true; |
| 172 | pending_checkpoint_writes_ = Constants::kNumMergeChunks; |
| 173 | for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) { |
| 174 | AsyncIoContext context{ this }; |
| 175 | RETURN_NOT_OK(file_.WriteAsync(&bucket(idx * chunk_size), idx * write_size, write_size, |
| 176 | callback, context)); |
| 177 | } |
| 178 | checkpoint_size = size_ * sizeof(HashBucket); |
| 179 | return Status::Ok; |
| 180 | } |
| 181 | |
| 182 | template <class D> |
| 183 | inline Status InternalHashTable<D>::CheckpointComplete(bool wait) { |
| 184 | disk_->TryComplete(); |
| 185 | bool complete = !checkpoint_pending_.load(); |
| 186 | while(wait && !complete) { |
| 187 | disk_->TryComplete(); |
| 188 | complete = !checkpoint_pending_.load(); |
| 189 | std::this_thread::yield(); |
| 190 | } |
| 191 | if(!complete) { |
| 192 | return Status::Pending; |
| 193 | } else { |
| 194 | return checkpoint_failed_ ? Status::IOError : Status::Ok; |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | template <class D> |
| 199 | Status InternalHashTable<D>::Recover(disk_t& disk, file_t&& file, uint64_t checkpoint_size) { |
| 200 | auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) { |
| 201 | CallbackContext<AsyncIoContext> context{ ctxt }; |
| 202 | if(result != Status::Ok) { |
| 203 | context->table->recover_failed_ = true; |
| 204 | } |
| 205 | if(--context->table->pending_recover_reads_ == 0) { |
| 206 | result = context->table->file_.Close(); |
| 207 | if(result != Status::Ok) { |
| 208 | context->table->recover_failed_ = true; |
| 209 | } |
| 210 | context->table->recover_pending_ = false; |
| 211 | } |
| 212 | }; |
| 213 | |
| 214 | assert(checkpoint_size > 0); |
| 215 | assert(checkpoint_size % sizeof(HashBucket) == 0); |
| 216 | assert(checkpoint_size % Constants::kNumMergeChunks == 0); |
| 217 | disk_ = &disk; |
| 218 | file_ = std::move(file); |
| 219 | |
| 220 | recover_failed_ = false; |
| 221 | uint32_t read_size = static_cast<uint32_t>(checkpoint_size / Constants::kNumMergeChunks); |
| 222 | uint32_t chunk_size = static_cast<uint32_t>(read_size / sizeof(HashBucket)); |
| 223 | assert(read_size % file_.alignment() == 0); |
| 224 | |
| 225 | Initialize(checkpoint_size / sizeof(HashBucket), file_.alignment()); |
| 226 | assert(!recover_pending_); |
| 227 | assert(pending_recover_reads_.load() == 0); |
| 228 | recover_pending_ = true; |
| 229 | pending_recover_reads_ = Constants::kNumMergeChunks; |
| 230 | for(uint32_t idx = 0; idx < Constants::kNumMergeChunks; ++idx) { |
| 231 | AsyncIoContext context{ this }; |
| 232 | RETURN_NOT_OK(file_.ReadAsync(idx * read_size, &bucket(idx * chunk_size), read_size, |
| 233 | callback, context)); |
| 234 | } |
| 235 | return Status::Ok; |
| 236 | } |
| 237 | |
| 238 | template <class D> |
| 239 | inline Status InternalHashTable<D>::RecoverComplete(bool wait) { |
| 240 | disk_->TryComplete(); |
| 241 | bool complete = !recover_pending_.load(); |
| 242 | while(wait && !complete) { |
| 243 | disk_->TryComplete(); |
| 244 | complete = !recover_pending_.load(); |
| 245 | std::this_thread::yield(); |
| 246 | } |
| 247 | if(!complete) { |
| 248 | return Status::Pending; |
| 249 | } else { |
| 250 | return recover_failed_ ? Status::IOError : Status::Ok; |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | template <class D> |
| 255 | inline void InternalHashTable<D>::DumpDistribution( |
| 256 | MallocFixedPageSize<HashBucket, disk_t>& overflow_buckets_allocator) { |
| 257 | uint64_t table_size = size(); |
| 258 | uint64_t total_record_count = 0; |
| 259 | uint64_t histogram[16] = { 0 }; |
| 260 | for(uint64_t bucket_idx = 0; bucket_idx < table_size; ++bucket_idx) { |
| 261 | const HashBucket* bucket = &buckets_[bucket_idx]; |
| 262 | uint64_t count = 0; |
| 263 | while(bucket) { |
| 264 | for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) { |
| 265 | if(!bucket->entries[entry_idx].load().unused()) { |
| 266 | ++count; |
| 267 | ++total_record_count; |
| 268 | } |
| 269 | } |
| 270 | HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load(); |
| 271 | if(overflow_entry.unused()) { |
| 272 | bucket = nullptr; |
| 273 | } else { |
| 274 | bucket = &overflow_buckets_allocator.Get(overflow_entry.address()); |
| 275 | } |
| 276 | } |
| 277 | if(count < 15) { |
| 278 | ++histogram[count]; |
| 279 | } else { |
| 280 | ++histogram[15]; |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | printf("number of hash buckets: %" PRIu64 "\n" , table_size); |
| 285 | printf("total record count: %" PRIu64 "\n" , total_record_count); |
| 286 | printf("histogram:\n" ); |
| 287 | for(uint8_t idx = 0; idx < 15; ++idx) { |
| 288 | printf("%2u : %" PRIu64 "\n" , idx, histogram[idx]); |
| 289 | } |
| 290 | printf("15+: %" PRIu64 "\n" , histogram[15]); |
| 291 | } |
| 292 | |
| 293 | } |
| 294 | } // namespace FASTER::core |
| 295 | |