1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cassert>
8#include <cstdint>
9#include <cstring>
10#include <deque>
11#include <unordered_map>
12#include <string>
13#include "address.h"
14#include "guid.h"
15#include "hash_bucket.h"
16#include "native_buffer_pool.h"
17#include "record.h"
18#include "state_transitions.h"
19#include "thread.h"
20
21namespace FASTER {
22namespace core {
23
24/// Internal contexts, used by FASTER.
25
26enum class OperationType : uint8_t {
27 Read,
28 RMW,
29 Upsert,
30 Insert,
31 Delete
32};
33
34enum class OperationStatus : uint8_t {
35 SUCCESS,
36 NOT_FOUND,
37 RETRY_NOW,
38 RETRY_LATER,
39 RECORD_ON_DISK,
40 SUCCESS_UNMARK,
41 NOT_FOUND_UNMARK,
42 CPR_SHIFT_DETECTED
43};
44
45/// Internal FASTER context.
46template <class K>
47class PendingContext : public IAsyncContext {
48 public:
49 typedef K key_t;
50
51 protected:
52 PendingContext(OperationType type_, IAsyncContext& caller_context_,
53 AsyncCallback caller_callback_)
54 : type{ type_ }
55 , caller_context{ &caller_context_ }
56 , caller_callback{ caller_callback_ }
57 , version{ UINT32_MAX }
58 , phase{ Phase::INVALID }
59 , result{ Status::Pending }
60 , address{ Address::kInvalidAddress }
61 , entry{ HashBucketEntry::kInvalidEntry } {
62 }
63
64 public:
65 /// The deep-copy constructor.
66 PendingContext(const PendingContext& other, IAsyncContext* caller_context_)
67 : type{ other.type }
68 , caller_context{ caller_context_ }
69 , caller_callback{ other.caller_callback }
70 , version{ other.version }
71 , phase{ other.phase }
72 , result{ other.result }
73 , address{ other.address }
74 , entry{ other.entry } {
75 }
76
77 public:
78 /// Go async, for the first time.
79 void go_async(Phase phase_, uint32_t version_, Address address_, HashBucketEntry entry_) {
80 phase = phase_;
81 version = version_;
82 address = address_;
83 entry = entry_;
84 }
85
86 /// Go async, again.
87 void continue_async(Address address_, HashBucketEntry entry_) {
88 address = address_;
89 entry = entry_;
90 }
91
92 virtual const key_t& key() const = 0;
93
94 /// Caller context.
95 IAsyncContext* caller_context;
96 /// Caller callback.
97 AsyncCallback caller_callback;
98 /// Checkpoint version.
99 uint32_t version;
100 /// Checkpoint phase.
101 Phase phase;
102 /// Type of operation (Read, Upsert, RMW, etc.).
103 OperationType type;
104 /// Result of operation.
105 Status result;
106 /// Address of the record being read or modified.
107 Address address;
108 /// Hash table entry that (indirectly) leads to the record being read or modified.
109 HashBucketEntry entry;
110};
111
112/// FASTER's internal Read() context.
113
114/// An internal Read() context that has gone async and lost its type information.
115template <class K>
116class AsyncPendingReadContext : public PendingContext<K> {
117 public:
118 typedef K key_t;
119 protected:
120 AsyncPendingReadContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
121 : PendingContext<key_t>(OperationType::Read, caller_context_, caller_callback_) {
122 }
123 /// The deep copy constructor.
124 AsyncPendingReadContext(AsyncPendingReadContext& other, IAsyncContext* caller_context)
125 : PendingContext<key_t>(other, caller_context) {
126 }
127 public:
128 virtual void Get(const void* rec) = 0;
129 virtual void GetAtomic(const void* rec) = 0;
130};
131
132/// A synchronous Read() context preserves its type information.
133template <class RC>
134class PendingReadContext : public AsyncPendingReadContext<typename RC::key_t> {
135 public:
136 typedef RC read_context_t;
137 typedef typename read_context_t::key_t key_t;
138 typedef typename read_context_t::value_t value_t;
139 typedef Record<key_t, value_t> record_t;
140
141 PendingReadContext(read_context_t& caller_context_, AsyncCallback caller_callback_)
142 : AsyncPendingReadContext<key_t>(caller_context_, caller_callback_) {
143 }
144 /// The deep copy constructor.
145 PendingReadContext(PendingReadContext& other, IAsyncContext* caller_context_)
146 : AsyncPendingReadContext<key_t>(other, caller_context_) {
147 }
148 protected:
149 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
150 return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
151 context_copy);
152 }
153 private:
154 inline const read_context_t& read_context() const {
155 return *static_cast<const read_context_t*>(PendingContext<key_t>::caller_context);
156 }
157 inline read_context_t& read_context() {
158 return *static_cast<read_context_t*>(PendingContext<key_t>::caller_context);
159 }
160 public:
161 /// Accessors.
162 inline const key_t& key() const final {
163 return read_context().key();
164 }
165 inline void Get(const void* rec) final {
166 const record_t* record = reinterpret_cast<const record_t*>(rec);
167 read_context().Get(record->value());
168 }
169 inline void GetAtomic(const void* rec) final {
170 const record_t* record = reinterpret_cast<const record_t*>(rec);
171 read_context().GetAtomic(record->value());
172 }
173};
174
175/// FASTER's internal Upsert() context.
176
177/// An internal Upsert() context that has gone async and lost its type information.
178template <class K>
179class AsyncPendingUpsertContext : public PendingContext<K> {
180 public:
181 typedef K key_t;
182 protected:
183 AsyncPendingUpsertContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
184 : PendingContext<key_t>(OperationType::Upsert, caller_context_, caller_callback_) {
185 }
186 /// The deep copy constructor.
187 AsyncPendingUpsertContext(AsyncPendingUpsertContext& other, IAsyncContext* caller_context)
188 : PendingContext<key_t>(other, caller_context) {
189 }
190 public:
191 virtual void Put(void* rec) = 0;
192 virtual bool PutAtomic(void* rec) = 0;
193 virtual uint32_t value_size() const = 0;
194};
195
196/// A synchronous Upsert() context preserves its type information.
197template <class UC>
198class PendingUpsertContext : public AsyncPendingUpsertContext<typename UC::key_t> {
199 public:
200 typedef UC upsert_context_t;
201 typedef typename upsert_context_t::key_t key_t;
202 typedef typename upsert_context_t::value_t value_t;
203 typedef Record<key_t, value_t> record_t;
204
205 PendingUpsertContext(upsert_context_t& caller_context_, AsyncCallback caller_callback_)
206 : AsyncPendingUpsertContext<key_t>(caller_context_, caller_callback_) {
207 }
208 /// The deep copy constructor.
209 PendingUpsertContext(PendingUpsertContext& other, IAsyncContext* caller_context_)
210 : AsyncPendingUpsertContext<key_t>(other, caller_context_) {
211 }
212 protected:
213 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
214 return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
215 context_copy);
216 }
217 private:
218 inline const upsert_context_t& upsert_context() const {
219 return *static_cast<const upsert_context_t*>(PendingContext<key_t>::caller_context);
220 }
221 inline upsert_context_t& upsert_context() {
222 return *static_cast<upsert_context_t*>(PendingContext<key_t>::caller_context);
223 }
224 public:
225 /// Accessors.
226 inline const key_t& key() const final {
227 return upsert_context().key();
228 }
229 inline void Put(void* rec) final {
230 record_t* record = reinterpret_cast<record_t*>(rec);
231 upsert_context().Put(record->value());
232 }
233 inline bool PutAtomic(void* rec) final {
234 record_t* record = reinterpret_cast<record_t*>(rec);
235 return upsert_context().PutAtomic(record->value());
236 }
237 inline constexpr uint32_t value_size() const final {
238 return upsert_context().value_size();
239 }
240};
241
242/// FASTER's internal Rmw() context.
243/// An internal Rmw() context that has gone async and lost its type information.
244template <class K>
245class AsyncPendingRmwContext : public PendingContext<K> {
246 public:
247 typedef K key_t;
248 protected:
249 AsyncPendingRmwContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
250 : PendingContext<key_t>(OperationType::RMW, caller_context_, caller_callback_) {
251 }
252 /// The deep copy constructor.
253 AsyncPendingRmwContext(AsyncPendingRmwContext& other, IAsyncContext* caller_context)
254 : PendingContext<key_t>(other, caller_context) {
255 }
256 public:
257 /// Set initial value.
258 virtual void RmwInitial(void* rec) = 0;
259 /// RCU.
260 virtual void RmwCopy(const void* old_rec, void* rec) = 0;
261 /// in-place update.
262 virtual bool RmwAtomic(void* rec) = 0;
263 virtual uint32_t value_size() const = 0;
264};
265
266/// A synchronous Rmw() context preserves its type information.
267template <class MC>
268class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> {
269 public:
270 typedef MC rmw_context_t;
271 typedef typename rmw_context_t::key_t key_t;
272 typedef typename rmw_context_t::value_t value_t;
273 typedef Record<key_t, value_t> record_t;
274
275 PendingRmwContext(rmw_context_t& caller_context_, AsyncCallback caller_callback_)
276 : AsyncPendingRmwContext<key_t>(caller_context_, caller_callback_) {
277 }
278 /// The deep copy constructor.
279 PendingRmwContext(PendingRmwContext& other, IAsyncContext* caller_context_)
280 : AsyncPendingRmwContext<key_t>(other, caller_context_) {
281 }
282 protected:
283 Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
284 return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
285 context_copy);
286 }
287 private:
288 const rmw_context_t& rmw_context() const {
289 return *static_cast<const rmw_context_t*>(PendingContext<key_t>::caller_context);
290 }
291 rmw_context_t& rmw_context() {
292 return *static_cast<rmw_context_t*>(PendingContext<key_t>::caller_context);
293 }
294 public:
295 /// Accessors.
296 const key_t& key() const {
297 return rmw_context().key();
298 }
299 /// Set initial value.
300 inline void RmwInitial(void* rec) final {
301 record_t* record = reinterpret_cast<record_t*>(rec);
302 rmw_context().RmwInitial(record->value());
303 }
304 /// RCU.
305 inline void RmwCopy(const void* old_rec, void* rec) final {
306 const record_t* old_record = reinterpret_cast<const record_t*>(old_rec);
307 record_t* record = reinterpret_cast<record_t*>(rec);
308 rmw_context().RmwCopy(old_record->value(), record->value());
309 }
310 /// in-place update.
311 inline bool RmwAtomic(void* rec) final {
312 record_t* record = reinterpret_cast<record_t*>(rec);
313 return rmw_context().RmwAtomic(record->value());
314 }
315 inline constexpr uint32_t value_size() const final {
316 return rmw_context().value_size();
317 }
318};
319
320class AsyncIOContext;
321
322/// Per-thread execution context. (Just the stuff that's checkpointed to disk.)
323struct PersistentExecContext {
324 PersistentExecContext()
325 : serial_num{ 0 }
326 , version{ 0 }
327 , guid{} {
328 }
329
330 void Initialize(uint32_t version_, const Guid& guid_, uint64_t serial_num_) {
331 serial_num = serial_num_;
332 version = version_;
333 guid = guid_;
334 }
335
336 uint64_t serial_num;
337 uint32_t version;
338 /// Unique identifier for this session.
339 Guid guid;
340};
341static_assert(sizeof(PersistentExecContext) == 32, "sizeof(PersistentExecContext) != 32");
342
343/// Per-thread execution context. (Also includes state kept in-memory-only.)
344struct ExecutionContext : public PersistentExecContext {
345 /// Default constructor.
346 ExecutionContext()
347 : phase{ Phase::INVALID }
348 , io_id{ 0 } {
349 }
350
351 void Initialize(Phase phase_, uint32_t version_, const Guid& guid_, uint64_t serial_num_) {
352 assert(retry_requests.empty());
353 assert(pending_ios.empty());
354 assert(io_responses.empty());
355
356 PersistentExecContext::Initialize(version_, guid_, serial_num_);
357 phase = phase_;
358 retry_requests.clear();
359 io_id = 0;
360 pending_ios.clear();
361 io_responses.clear();
362 }
363
364 Phase phase;
365
366 /// Retry request contexts are stored inside the deque.
367 std::deque<IAsyncContext*> retry_requests;
368 /// Assign a unique ID to every I/O request.
369 uint64_t io_id;
370 /// For each pending I/O, maps io_id to the hash of the key being retrieved.
371 std::unordered_map<uint64_t, KeyHash> pending_ios;
372
373 /// The I/O completion thread hands the PendingContext back to the thread that issued the
374 /// request.
375 concurrent_queue<AsyncIOContext*> io_responses;
376};
377
378}
379} // namespace FASTER::core
380