1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT license.
3
4#pragma once
5
6#include <atomic>
7#include <cstdint>
8#include "phase.h"
9
10namespace FASTER {
11namespace core {
12
13struct ResizeInfo {
14 uint8_t version;
15};
16
17/// Each FASTER store can perform only one action at a time (checkpoint, recovery, garbage
18// collect, or grow index).
19enum class Action : uint8_t {
20 None = 0,
21 CheckpointFull,
22 CheckpointIndex,
23 CheckpointHybridLog,
24 Recover,
25 GC,
26 GrowIndex
27};
28
29struct SystemState {
30 SystemState(Action action_, Phase phase_, uint32_t version_)
31 : control_{ 0 } {
32 action = action_;
33 phase = phase_;
34 version = version_;
35 }
36 SystemState(uint64_t control)
37 : control_{ control } {
38 }
39 SystemState(const SystemState& other)
40 : control_{ other.control_ } {
41 }
42
43 inline SystemState& operator=(const SystemState& other) {
44 control_ = other.control_;
45 return *this;
46 }
47 inline bool operator==(const SystemState& other) {
48 return control_ == other.control_;
49 }
50 inline bool operator!=(const SystemState& other) {
51 return control_ != other.control_;
52 }
53
54 /// The state transitions.
55 inline SystemState GetNextState() const {
56 switch(action) {
57 case Action::CheckpointFull:
58 switch(phase) {
59 case Phase::REST:
60 return SystemState{ Action::CheckpointFull, Phase::PREP_INDEX_CHKPT, version };
61 case Phase::PREP_INDEX_CHKPT:
62 return SystemState{ Action::CheckpointFull, Phase::INDEX_CHKPT, version };
63 case Phase::INDEX_CHKPT:
64 return SystemState{ Action::CheckpointFull, Phase::PREPARE, version };
65 case Phase::PREPARE:
66 return SystemState{ Action::CheckpointFull, Phase::IN_PROGRESS, version + 1 };
67 case Phase::IN_PROGRESS:
68 return SystemState{ Action::CheckpointFull, Phase::WAIT_PENDING, version };
69 case Phase::WAIT_PENDING:
70 return SystemState{ Action::CheckpointFull, Phase::WAIT_FLUSH, version };
71 case Phase::WAIT_FLUSH:
72 return SystemState{ Action::CheckpointFull, Phase::PERSISTENCE_CALLBACK, version };
73 case Phase::PERSISTENCE_CALLBACK:
74 return SystemState{ Action::CheckpointFull, Phase::REST, version };
75 default:
76 // not reached
77 assert(false);
78 return SystemState(UINT64_MAX);
79 }
80 break;
81 case Action::CheckpointIndex:
82 switch(phase) {
83 case Phase::REST:
84 return SystemState{ Action::CheckpointIndex, Phase::PREP_INDEX_CHKPT, version };
85 case Phase::PREP_INDEX_CHKPT:
86 return SystemState{ Action::CheckpointIndex, Phase::INDEX_CHKPT, version };
87 case Phase::INDEX_CHKPT:
88 return SystemState{ Action::CheckpointIndex, Phase::REST, version };
89 default:
90 // not reached
91 assert(false);
92 return SystemState(UINT64_MAX);
93 }
94 break;
95 case Action::CheckpointHybridLog:
96 switch(phase) {
97 case Phase::REST:
98 return SystemState{ Action::CheckpointHybridLog, Phase::PREPARE, version };
99 case Phase::PREPARE:
100 return SystemState{ Action::CheckpointHybridLog, Phase::IN_PROGRESS, version + 1 };
101 case Phase::IN_PROGRESS:
102 return SystemState{ Action::CheckpointHybridLog, Phase::WAIT_PENDING, version };
103 case Phase::WAIT_PENDING:
104 return SystemState{ Action::CheckpointHybridLog, Phase::WAIT_FLUSH, version };
105 case Phase::WAIT_FLUSH:
106 return SystemState{ Action::CheckpointHybridLog, Phase::PERSISTENCE_CALLBACK, version };
107 case Phase::PERSISTENCE_CALLBACK:
108 return SystemState{ Action::CheckpointHybridLog, Phase::REST, version };
109 default:
110 // not reached
111 assert(false);
112 return SystemState(UINT64_MAX);
113 }
114 break;
115 case Action::GC:
116 switch(phase) {
117 case Phase::REST:
118 return SystemState{ Action::GC, Phase::GC_IO_PENDING, version };
119 case Phase::GC_IO_PENDING:
120 return SystemState{ Action::GC, Phase::GC_IN_PROGRESS, version };
121 case Phase::GC_IN_PROGRESS:
122 return SystemState{ Action::GC, Phase::REST, version };
123 default:
124 // not reached
125 assert(false);
126 return SystemState(UINT64_MAX);
127 }
128 break;
129 case Action::GrowIndex:
130 switch(phase) {
131 case Phase::REST:
132 return SystemState{ Action::GrowIndex, Phase::GROW_PREPARE, version };
133 case Phase::GROW_PREPARE:
134 return SystemState{ Action::GrowIndex, Phase::GROW_IN_PROGRESS, version };
135 case Phase::GROW_IN_PROGRESS:
136 return SystemState{ Action::GrowIndex, Phase::REST, version };
137 default:
138 // not reached
139 assert(false);
140 return SystemState(UINT64_MAX);
141 }
142 default:
143 // not reached
144 assert(false);
145 return SystemState(UINT64_MAX);
146 }
147 }
148
149 union {
150 struct {
151 /// Action being performed (checkpoint, recover, or gc).
152 Action action;
153 /// Phase of that action currently being executed.
154 Phase phase;
155 /// Checkpoint version (used for CPR).
156 uint32_t version;
157 };
158 uint64_t control_;
159 };
160};
161static_assert(sizeof(SystemState) == 8, "sizeof(SystemState) != 8");
162
163class AtomicSystemState {
164 public:
165 AtomicSystemState(Action action_, Phase phase_, uint32_t version_) {
166 SystemState state{ action_, phase_, version_ };
167 control_.store(state.control_);
168 }
169
170 /// Atomic access.
171 inline SystemState load() const {
172 return SystemState{ control_.load() };
173 }
174 inline void store(SystemState value) {
175 control_.store(value.control_);
176 }
177 inline bool compare_exchange_strong(SystemState& expected, SystemState desired) {
178 uint64_t expected_control = expected.control_;
179 bool result = control_.compare_exchange_strong(expected_control, desired.control_);
180 expected = SystemState{ expected_control };
181 return result;
182 }
183
184 /// Accessors.
185 inline Phase phase() const {
186 return load().phase;
187 }
188 inline uint32_t version() const {
189 return load().version;
190 }
191
192 private:
193 /// Atomic access to the system state.
194 std::atomic<uint64_t> control_;
195};
196
197}
198} // namespace FASTER::core
199