1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cstdint>
8#include <unordered_map>
9#include "address.h"
10#include "guid.h"
11#include "malloc_fixed_page_size.h"
12#include "status.h"
13#include "thread.h"
14
15namespace FASTER {
16namespace core {
17
18/// Checkpoint metadata for the index itself.
19class IndexMetadata {
20 public:
21 IndexMetadata()
22 : version{ 0 }
23 , table_size{ 0 }
24 , num_ht_bytes{ 0 }
25 , num_ofb_bytes{ 0 }
26 , ofb_count{ FixedPageAddress::kInvalidAddress }
27 , log_begin_address{ Address::kInvalidAddress }
28 , checkpoint_start_address{ Address::kInvalidAddress } {
29 }
30
31 inline void Initialize(uint32_t version_, uint64_t size_, Address log_begin_address_,
32 Address checkpoint_start_address_) {
33 version = version_;
34 table_size = size_;
35 log_begin_address = log_begin_address_;
36 checkpoint_start_address = checkpoint_start_address_;
37 num_ht_bytes = 0;
38 num_ofb_bytes = 0;
39 ofb_count = FixedPageAddress::kInvalidAddress;
40 }
41 inline void Reset() {
42 version = 0;
43 table_size = 0;
44 num_ht_bytes = 0;
45 num_ofb_bytes = 0;
46 ofb_count = FixedPageAddress::kInvalidAddress;
47 log_begin_address = Address::kInvalidAddress;
48 checkpoint_start_address = Address::kInvalidAddress;
49 }
50
51 uint32_t version;
52 uint64_t table_size;
53 uint64_t num_ht_bytes;
54 uint64_t num_ofb_bytes;
55 FixedPageAddress ofb_count;
56 /// Earliest address that is valid for the log.
57 Address log_begin_address;
58 /// Address as of which this checkpoint was taken.
59 Address checkpoint_start_address;
60};
61static_assert(sizeof(IndexMetadata) == 56, "sizeof(IndexMetadata) != 56");
62
63/// Checkpoint metadata, for the log.
64class LogMetadata {
65 public:
66 LogMetadata()
67 : use_snapshot_file{ false }
68 , version{ UINT32_MAX }
69 , num_threads{ 0 }
70 , flushed_address{ Address::kInvalidAddress }
71 , final_address{ Address::kMaxAddress } {
72 std::memset(guids, 0, sizeof(guids));
73 std::memset(monotonic_serial_nums, 0, sizeof(monotonic_serial_nums));
74 }
75
76 inline void Initialize(bool use_snapshot_file_, uint32_t version_, Address flushed_address_) {
77 use_snapshot_file = use_snapshot_file_;
78 version = version_;
79 num_threads = 0;
80 flushed_address = flushed_address_;
81 final_address = Address::kMaxAddress;
82 std::memset(guids, 0, sizeof(guids));
83 std::memset(monotonic_serial_nums, 0, sizeof(monotonic_serial_nums));
84 }
85 inline void Reset() {
86 Initialize(false, UINT32_MAX, Address::kInvalidAddress);
87 }
88
89 bool use_snapshot_file;
90 uint32_t version;
91 std::atomic<uint32_t> num_threads;
92 Address flushed_address;
93 Address final_address;
94 uint64_t monotonic_serial_nums[Thread::kMaxNumThreads];
95 Guid guids[Thread::kMaxNumThreads];
96};
97static_assert(sizeof(LogMetadata) == 32 + (24 * Thread::kMaxNumThreads),
98 "sizeof(LogMetadata) != 32 + (24 * Thread::kMaxNumThreads)");
99
100/// State of the active Checkpoint()/Recover() call, including metadata written to disk.
101template <class F>
102class CheckpointState {
103 public:
104 typedef F file_t;
105 typedef void(*index_persistence_callback_t)(Status result);
106 typedef void(*hybrid_log_persistence_callback_t)(Status result, uint64_t persistent_serial_num);
107
108 CheckpointState()
109 : index_checkpoint_started{ false }
110 , failed{ false }
111 , flush_pending{ UINT32_MAX }
112 , index_persistence_callback{ nullptr }
113 , hybrid_log_persistence_callback{ nullptr } {
114 }
115
116 void InitializeIndexCheckpoint(const Guid& token, uint32_t version, uint64_t table_size,
117 Address log_begin_address, Address checkpoint_start_address,
118 index_persistence_callback_t callback) {
119 failed = false;
120 index_checkpoint_started = false;
121 continue_tokens.clear();
122 index_token = token;
123 hybrid_log_token = Guid{};
124 index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address);
125 log_metadata.Reset();
126 flush_pending = 0;
127 index_persistence_callback = callback;
128 hybrid_log_persistence_callback = nullptr;
129 }
130
131 void InitializeHybridLogCheckpoint(const Guid& token, uint32_t version, bool use_snapshot_file,
132 Address flushed_until_address,
133 hybrid_log_persistence_callback_t callback) {
134 failed = false;
135 index_checkpoint_started = false;
136 continue_tokens.clear();
137 index_token = Guid{};
138 hybrid_log_token = token;
139 index_metadata.Reset();
140 log_metadata.Initialize(use_snapshot_file, version, flushed_until_address);
141 if(use_snapshot_file) {
142 flush_pending = UINT32_MAX;
143 } else {
144 flush_pending = 0;
145 }
146 index_persistence_callback = nullptr;
147 hybrid_log_persistence_callback = callback;
148 }
149
150 void InitializeCheckpoint(const Guid& token, uint32_t version, uint64_t table_size,
151 Address log_begin_address, Address checkpoint_start_address,
152 bool use_snapshot_file, Address flushed_until_address,
153 index_persistence_callback_t index_persistence_callback_,
154 hybrid_log_persistence_callback_t hybrid_log_persistence_callback_) {
155 failed = false;
156 index_checkpoint_started = false;
157 continue_tokens.clear();
158 index_token = token;
159 hybrid_log_token = token;
160 index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address);
161 log_metadata.Initialize(use_snapshot_file, version, flushed_until_address);
162 if(use_snapshot_file) {
163 flush_pending = UINT32_MAX;
164 } else {
165 flush_pending = 0;
166 }
167 index_persistence_callback = index_persistence_callback_;
168 hybrid_log_persistence_callback = hybrid_log_persistence_callback_;
169 }
170
171 void CheckpointDone() {
172 assert(!failed);
173 assert(index_token == Guid{} || index_checkpoint_started);
174 assert(continue_tokens.empty());
175 assert(flush_pending == 0);
176 index_metadata.Reset();
177 log_metadata.Reset();
178 snapshot_file.Close();
179 index_persistence_callback = nullptr;
180 hybrid_log_persistence_callback = nullptr;
181 }
182
183 inline void InitializeRecover(const Guid& index_token_, const Guid& hybrid_log_token_) {
184 failed = false;
185 index_token = index_token_;
186 hybrid_log_token = hybrid_log_token_;
187 }
188
189 void RecoverDone() {
190 assert(!failed);
191 index_metadata.Reset();
192 log_metadata.Reset();
193 snapshot_file.Close();
194 }
195
196 std::atomic<bool> index_checkpoint_started;
197 std::atomic<bool> failed;
198 IndexMetadata index_metadata;
199 LogMetadata log_metadata;
200
201 Guid index_token;
202 Guid hybrid_log_token;
203
204 /// State used when fold_over_snapshot = false.
205 file_t snapshot_file;
206 std::atomic<uint32_t> flush_pending;
207
208 index_persistence_callback_t index_persistence_callback;
209 hybrid_log_persistence_callback_t hybrid_log_persistence_callback;
210 std::unordered_map<Guid, uint64_t> continue_tokens;
211};
212
213}
214} // namespace FASTER::core
215
216