1 | // Copyright (c) Microsoft Corporation. All rights reserved. |
2 | // Licensed under the MIT license. |
3 | |
4 | #pragma once |
5 | |
6 | #include <atomic> |
7 | #include <cstdint> |
8 | #include <unordered_map> |
9 | #include "address.h" |
10 | #include "guid.h" |
11 | #include "malloc_fixed_page_size.h" |
12 | #include "status.h" |
13 | #include "thread.h" |
14 | |
15 | namespace FASTER { |
16 | namespace core { |
17 | |
18 | /// Checkpoint metadata for the index itself. |
19 | class IndexMetadata { |
20 | public: |
21 | IndexMetadata() |
22 | : version{ 0 } |
23 | , table_size{ 0 } |
24 | , num_ht_bytes{ 0 } |
25 | , num_ofb_bytes{ 0 } |
26 | , ofb_count{ FixedPageAddress::kInvalidAddress } |
27 | , log_begin_address{ Address::kInvalidAddress } |
28 | , checkpoint_start_address{ Address::kInvalidAddress } { |
29 | } |
30 | |
31 | inline void Initialize(uint32_t version_, uint64_t size_, Address log_begin_address_, |
32 | Address checkpoint_start_address_) { |
33 | version = version_; |
34 | table_size = size_; |
35 | log_begin_address = log_begin_address_; |
36 | checkpoint_start_address = checkpoint_start_address_; |
37 | num_ht_bytes = 0; |
38 | num_ofb_bytes = 0; |
39 | ofb_count = FixedPageAddress::kInvalidAddress; |
40 | } |
41 | inline void Reset() { |
42 | version = 0; |
43 | table_size = 0; |
44 | num_ht_bytes = 0; |
45 | num_ofb_bytes = 0; |
46 | ofb_count = FixedPageAddress::kInvalidAddress; |
47 | log_begin_address = Address::kInvalidAddress; |
48 | checkpoint_start_address = Address::kInvalidAddress; |
49 | } |
50 | |
51 | uint32_t version; |
52 | uint64_t table_size; |
53 | uint64_t num_ht_bytes; |
54 | uint64_t num_ofb_bytes; |
55 | FixedPageAddress ofb_count; |
56 | /// Earliest address that is valid for the log. |
57 | Address log_begin_address; |
58 | /// Address as of which this checkpoint was taken. |
59 | Address checkpoint_start_address; |
60 | }; |
61 | static_assert(sizeof(IndexMetadata) == 56, "sizeof(IndexMetadata) != 56" ); |
62 | |
63 | /// Checkpoint metadata, for the log. |
64 | class LogMetadata { |
65 | public: |
66 | LogMetadata() |
67 | : use_snapshot_file{ false } |
68 | , version{ UINT32_MAX } |
69 | , num_threads{ 0 } |
70 | , flushed_address{ Address::kInvalidAddress } |
71 | , final_address{ Address::kMaxAddress } { |
72 | std::memset(guids, 0, sizeof(guids)); |
73 | std::memset(monotonic_serial_nums, 0, sizeof(monotonic_serial_nums)); |
74 | } |
75 | |
76 | inline void Initialize(bool use_snapshot_file_, uint32_t version_, Address flushed_address_) { |
77 | use_snapshot_file = use_snapshot_file_; |
78 | version = version_; |
79 | num_threads = 0; |
80 | flushed_address = flushed_address_; |
81 | final_address = Address::kMaxAddress; |
82 | std::memset(guids, 0, sizeof(guids)); |
83 | std::memset(monotonic_serial_nums, 0, sizeof(monotonic_serial_nums)); |
84 | } |
85 | inline void Reset() { |
86 | Initialize(false, UINT32_MAX, Address::kInvalidAddress); |
87 | } |
88 | |
89 | bool use_snapshot_file; |
90 | uint32_t version; |
91 | std::atomic<uint32_t> num_threads; |
92 | Address flushed_address; |
93 | Address final_address; |
94 | uint64_t monotonic_serial_nums[Thread::kMaxNumThreads]; |
95 | Guid guids[Thread::kMaxNumThreads]; |
96 | }; |
97 | static_assert(sizeof(LogMetadata) == 32 + (24 * Thread::kMaxNumThreads), |
98 | "sizeof(LogMetadata) != 32 + (24 * Thread::kMaxNumThreads)" ); |
99 | |
100 | /// State of the active Checkpoint()/Recover() call, including metadata written to disk. |
101 | template <class F> |
102 | class CheckpointState { |
103 | public: |
104 | typedef F file_t; |
105 | typedef void(*index_persistence_callback_t)(Status result); |
106 | typedef void(*hybrid_log_persistence_callback_t)(Status result, uint64_t persistent_serial_num); |
107 | |
108 | CheckpointState() |
109 | : index_checkpoint_started{ false } |
110 | , failed{ false } |
111 | , flush_pending{ UINT32_MAX } |
112 | , index_persistence_callback{ nullptr } |
113 | , hybrid_log_persistence_callback{ nullptr } { |
114 | } |
115 | |
116 | void InitializeIndexCheckpoint(const Guid& token, uint32_t version, uint64_t table_size, |
117 | Address log_begin_address, Address checkpoint_start_address, |
118 | index_persistence_callback_t callback) { |
119 | failed = false; |
120 | index_checkpoint_started = false; |
121 | continue_tokens.clear(); |
122 | index_token = token; |
123 | hybrid_log_token = Guid{}; |
124 | index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address); |
125 | log_metadata.Reset(); |
126 | flush_pending = 0; |
127 | index_persistence_callback = callback; |
128 | hybrid_log_persistence_callback = nullptr; |
129 | } |
130 | |
131 | void InitializeHybridLogCheckpoint(const Guid& token, uint32_t version, bool use_snapshot_file, |
132 | Address flushed_until_address, |
133 | hybrid_log_persistence_callback_t callback) { |
134 | failed = false; |
135 | index_checkpoint_started = false; |
136 | continue_tokens.clear(); |
137 | index_token = Guid{}; |
138 | hybrid_log_token = token; |
139 | index_metadata.Reset(); |
140 | log_metadata.Initialize(use_snapshot_file, version, flushed_until_address); |
141 | if(use_snapshot_file) { |
142 | flush_pending = UINT32_MAX; |
143 | } else { |
144 | flush_pending = 0; |
145 | } |
146 | index_persistence_callback = nullptr; |
147 | hybrid_log_persistence_callback = callback; |
148 | } |
149 | |
150 | void InitializeCheckpoint(const Guid& token, uint32_t version, uint64_t table_size, |
151 | Address log_begin_address, Address checkpoint_start_address, |
152 | bool use_snapshot_file, Address flushed_until_address, |
153 | index_persistence_callback_t index_persistence_callback_, |
154 | hybrid_log_persistence_callback_t hybrid_log_persistence_callback_) { |
155 | failed = false; |
156 | index_checkpoint_started = false; |
157 | continue_tokens.clear(); |
158 | index_token = token; |
159 | hybrid_log_token = token; |
160 | index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address); |
161 | log_metadata.Initialize(use_snapshot_file, version, flushed_until_address); |
162 | if(use_snapshot_file) { |
163 | flush_pending = UINT32_MAX; |
164 | } else { |
165 | flush_pending = 0; |
166 | } |
167 | index_persistence_callback = index_persistence_callback_; |
168 | hybrid_log_persistence_callback = hybrid_log_persistence_callback_; |
169 | } |
170 | |
171 | void CheckpointDone() { |
172 | assert(!failed); |
173 | assert(index_token == Guid{} || index_checkpoint_started); |
174 | assert(continue_tokens.empty()); |
175 | assert(flush_pending == 0); |
176 | index_metadata.Reset(); |
177 | log_metadata.Reset(); |
178 | snapshot_file.Close(); |
179 | index_persistence_callback = nullptr; |
180 | hybrid_log_persistence_callback = nullptr; |
181 | } |
182 | |
183 | inline void InitializeRecover(const Guid& index_token_, const Guid& hybrid_log_token_) { |
184 | failed = false; |
185 | index_token = index_token_; |
186 | hybrid_log_token = hybrid_log_token_; |
187 | } |
188 | |
189 | void RecoverDone() { |
190 | assert(!failed); |
191 | index_metadata.Reset(); |
192 | log_metadata.Reset(); |
193 | snapshot_file.Close(); |
194 | } |
195 | |
196 | std::atomic<bool> index_checkpoint_started; |
197 | std::atomic<bool> failed; |
198 | IndexMetadata index_metadata; |
199 | LogMetadata log_metadata; |
200 | |
201 | Guid index_token; |
202 | Guid hybrid_log_token; |
203 | |
204 | /// State used when fold_over_snapshot = false. |
205 | file_t snapshot_file; |
206 | std::atomic<uint32_t> flush_pending; |
207 | |
208 | index_persistence_callback_t index_persistence_callback; |
209 | hybrid_log_persistence_callback_t hybrid_log_persistence_callback; |
210 | std::unordered_map<Guid, uint64_t> continue_tokens; |
211 | }; |
212 | |
213 | } |
214 | } // namespace FASTER::core |
215 | |
216 | |