1 | #include "duckdb/storage/write_ahead_log.hpp" |
2 | #include "duckdb/storage/data_table.hpp" |
3 | #include "duckdb/common/serializer/buffered_file_reader.hpp" |
4 | #include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" |
5 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
6 | #include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" |
7 | #include "duckdb/main/client_context.hpp" |
8 | #include "duckdb/main/database.hpp" |
9 | #include "duckdb/parser/parsed_data/alter_table_info.hpp" |
10 | #include "duckdb/parser/parsed_data/drop_info.hpp" |
11 | #include "duckdb/parser/parsed_data/create_schema_info.hpp" |
12 | #include "duckdb/parser/parsed_data/create_table_info.hpp" |
13 | #include "duckdb/parser/parsed_data/create_view_info.hpp" |
14 | #include "duckdb/planner/binder.hpp" |
15 | #include "duckdb/planner/parsed_data/bound_create_table_info.hpp" |
16 | |
17 | using namespace duckdb; |
18 | using namespace std; |
19 | |
20 | class ReplayState { |
21 | public: |
22 | ReplayState(DuckDB &db, ClientContext &context, Deserializer &source) |
23 | : db(db), context(context), source(source), current_table(nullptr) { |
24 | } |
25 | |
26 | DuckDB &db; |
27 | ClientContext &context; |
28 | Deserializer &source; |
29 | TableCatalogEntry *current_table; |
30 | |
31 | public: |
32 | void ReplayEntry(WALType entry_type); |
33 | |
34 | private: |
35 | void ReplayCreateTable(); |
36 | void ReplayDropTable(); |
37 | void ReplayAlter(); |
38 | |
39 | void ReplayCreateView(); |
40 | void ReplayDropView(); |
41 | |
42 | void ReplayCreateSchema(); |
43 | void ReplayDropSchema(); |
44 | |
45 | void ReplayCreateSequence(); |
46 | void ReplayDropSequence(); |
47 | void ReplaySequenceValue(); |
48 | |
49 | void ReplayUseTable(); |
50 | void ReplayInsert(); |
51 | void ReplayDelete(); |
52 | void ReplayUpdate(); |
53 | }; |
54 | |
55 | void WriteAheadLog::Replay(DuckDB &database, string &path) { |
56 | BufferedFileReader reader(*database.file_system, path.c_str()); |
57 | |
58 | if (reader.Finished()) { |
59 | // WAL is empty |
60 | return; |
61 | } |
62 | |
63 | ClientContext context(database); |
64 | context.transaction.SetAutoCommit(false); |
65 | context.transaction.BeginTransaction(); |
66 | |
67 | ReplayState state(database, context, reader); |
68 | |
69 | // replay the WAL |
70 | // note that everything is wrapped inside a try/catch block here |
71 | // there can be errors in WAL replay because of a corrupt WAL file |
72 | // in this case we should throw a warning but startup anyway |
73 | try { |
74 | while (true) { |
75 | // read the current entry |
76 | WALType entry_type = reader.Read<WALType>(); |
77 | if (entry_type == WALType::WAL_FLUSH) { |
78 | // flush: commit the current transaction |
79 | context.transaction.Commit(); |
80 | context.transaction.SetAutoCommit(false); |
81 | // check if the file is exhausted |
82 | if (reader.Finished()) { |
83 | // we finished reading the file: break |
84 | break; |
85 | } |
86 | // otherwise we keep on reading |
87 | context.transaction.BeginTransaction(); |
88 | } else { |
89 | // replay the entry |
90 | state.ReplayEntry(entry_type); |
91 | } |
92 | } |
93 | } catch (std::exception &ex) { |
94 | // FIXME: this report a proper warning in the connection |
95 | fprintf(stderr, "Exception in WAL playback: %s\n" , ex.what()); |
96 | // exception thrown in WAL replay: rollback |
97 | context.transaction.Rollback(); |
98 | } |
99 | } |
100 | |
101 | //===--------------------------------------------------------------------===// |
102 | // Replay Entries |
103 | //===--------------------------------------------------------------------===// |
104 | void ReplayState::ReplayEntry(WALType entry_type) { |
105 | switch (entry_type) { |
106 | case WALType::CREATE_TABLE: |
107 | ReplayCreateTable(); |
108 | break; |
109 | case WALType::DROP_TABLE: |
110 | ReplayDropTable(); |
111 | break; |
112 | case WALType::ALTER_INFO: |
113 | ReplayAlter(); |
114 | break; |
115 | case WALType::CREATE_VIEW: |
116 | ReplayCreateView(); |
117 | break; |
118 | case WALType::DROP_VIEW: |
119 | ReplayDropView(); |
120 | break; |
121 | case WALType::CREATE_SCHEMA: |
122 | ReplayCreateSchema(); |
123 | break; |
124 | case WALType::DROP_SCHEMA: |
125 | ReplayDropSchema(); |
126 | break; |
127 | case WALType::CREATE_SEQUENCE: |
128 | ReplayCreateSequence(); |
129 | break; |
130 | case WALType::DROP_SEQUENCE: |
131 | ReplayDropSequence(); |
132 | break; |
133 | case WALType::SEQUENCE_VALUE: |
134 | ReplaySequenceValue(); |
135 | break; |
136 | case WALType::USE_TABLE: |
137 | ReplayUseTable(); |
138 | break; |
139 | case WALType::INSERT_TUPLE: |
140 | ReplayInsert(); |
141 | break; |
142 | case WALType::DELETE_TUPLE: |
143 | ReplayDelete(); |
144 | break; |
145 | case WALType::UPDATE_TUPLE: |
146 | ReplayUpdate(); |
147 | break; |
148 | default: |
149 | throw Exception("Invalid WAL entry type!" ); |
150 | } |
151 | } |
152 | |
153 | //===--------------------------------------------------------------------===// |
154 | // Replay Table |
155 | //===--------------------------------------------------------------------===// |
156 | void ReplayState::ReplayCreateTable() { |
157 | auto info = TableCatalogEntry::Deserialize(source); |
158 | |
159 | // bind the constraints to the table again |
160 | Binder binder(context); |
161 | auto bound_info = binder.BindCreateTableInfo(move(info)); |
162 | |
163 | db.catalog->CreateTable(context, bound_info.get()); |
164 | } |
165 | |
166 | void ReplayState::ReplayDropTable() { |
167 | DropInfo info; |
168 | |
169 | info.type = CatalogType::TABLE; |
170 | info.schema = source.Read<string>(); |
171 | info.name = source.Read<string>(); |
172 | |
173 | db.catalog->DropEntry(context, &info); |
174 | } |
175 | |
176 | void ReplayState::ReplayAlter() { |
177 | auto info = AlterInfo::Deserialize(source); |
178 | if (info->type != AlterType::ALTER_TABLE) { |
179 | throw Exception("Expected ALTER TABLE!" ); |
180 | } |
181 | |
182 | db.catalog->AlterTable(context, (AlterTableInfo *)info.get()); |
183 | } |
184 | |
185 | //===--------------------------------------------------------------------===// |
186 | // Replay View |
187 | //===--------------------------------------------------------------------===// |
188 | void ReplayState::ReplayCreateView() { |
189 | auto entry = ViewCatalogEntry::Deserialize(source); |
190 | |
191 | db.catalog->CreateView(context, entry.get()); |
192 | } |
193 | |
194 | void ReplayState::ReplayDropView() { |
195 | DropInfo info; |
196 | info.type = CatalogType::VIEW; |
197 | info.schema = source.Read<string>(); |
198 | info.name = source.Read<string>(); |
199 | db.catalog->DropEntry(context, &info); |
200 | } |
201 | |
202 | //===--------------------------------------------------------------------===// |
203 | // Replay Schema |
204 | //===--------------------------------------------------------------------===// |
205 | void ReplayState::ReplayCreateSchema() { |
206 | CreateSchemaInfo info; |
207 | info.schema = source.Read<string>(); |
208 | |
209 | db.catalog->CreateSchema(context, &info); |
210 | } |
211 | |
212 | void ReplayState::ReplayDropSchema() { |
213 | DropInfo info; |
214 | |
215 | info.type = CatalogType::SCHEMA; |
216 | info.name = source.Read<string>(); |
217 | |
218 | db.catalog->DropEntry(context, &info); |
219 | } |
220 | |
221 | //===--------------------------------------------------------------------===// |
222 | // Replay Sequence |
223 | //===--------------------------------------------------------------------===// |
224 | void ReplayState::ReplayCreateSequence() { |
225 | auto entry = SequenceCatalogEntry::Deserialize(source); |
226 | |
227 | db.catalog->CreateSequence(context, entry.get()); |
228 | } |
229 | |
230 | void ReplayState::ReplayDropSequence() { |
231 | DropInfo info; |
232 | info.type = CatalogType::SEQUENCE; |
233 | info.schema = source.Read<string>(); |
234 | info.name = source.Read<string>(); |
235 | |
236 | db.catalog->DropEntry(context, &info); |
237 | } |
238 | |
239 | void ReplayState::ReplaySequenceValue() { |
240 | auto schema = source.Read<string>(); |
241 | auto name = source.Read<string>(); |
242 | auto usage_count = source.Read<uint64_t>(); |
243 | auto counter = source.Read<int64_t>(); |
244 | |
245 | // fetch the sequence from the catalog |
246 | auto seq = db.catalog->GetEntry<SequenceCatalogEntry>(context, schema, name); |
247 | if (usage_count > seq->usage_count) { |
248 | seq->usage_count = usage_count; |
249 | seq->counter = counter; |
250 | } |
251 | } |
252 | |
253 | //===--------------------------------------------------------------------===// |
254 | // Replay Data |
255 | //===--------------------------------------------------------------------===// |
256 | void ReplayState::ReplayUseTable() { |
257 | auto schema_name = source.Read<string>(); |
258 | auto table_name = source.Read<string>(); |
259 | current_table = db.catalog->GetEntry<TableCatalogEntry>(context, schema_name, table_name); |
260 | } |
261 | |
262 | void ReplayState::ReplayInsert() { |
263 | if (!current_table) { |
264 | throw Exception("Corrupt WAL: insert without table" ); |
265 | } |
266 | DataChunk chunk; |
267 | chunk.Deserialize(source); |
268 | |
269 | // append to the current table |
270 | current_table->storage->Append(*current_table, context, chunk); |
271 | } |
272 | |
273 | void ReplayState::ReplayDelete() { |
274 | if (!current_table) { |
275 | throw Exception("Corrupt WAL: delete without table" ); |
276 | } |
277 | DataChunk chunk; |
278 | chunk.Deserialize(source); |
279 | |
280 | assert(chunk.column_count() == 1 && chunk.data[0].type == ROW_TYPE); |
281 | row_t row_ids[1]; |
282 | Vector row_identifiers(ROW_TYPE, (data_ptr_t)row_ids); |
283 | |
284 | auto source_ids = FlatVector::GetData<row_t>(chunk.data[0]); |
285 | // delete the tuples from the current table |
286 | for (idx_t i = 0; i < chunk.size(); i++) { |
287 | row_ids[0] = source_ids[i]; |
288 | current_table->storage->Delete(*current_table, context, row_identifiers, 1); |
289 | } |
290 | } |
291 | |
292 | void ReplayState::ReplayUpdate() { |
293 | if (!current_table) { |
294 | throw Exception("Corrupt WAL: update without table" ); |
295 | } |
296 | |
297 | idx_t column_index = source.Read<column_t>(); |
298 | |
299 | DataChunk chunk; |
300 | chunk.Deserialize(source); |
301 | |
302 | vector<column_t> column_ids{column_index}; |
303 | if (column_index >= current_table->columns.size()) { |
304 | throw Exception("Corrupt WAL: column index for update out of bounds" ); |
305 | } |
306 | |
307 | // remove the row id vector from the chunk |
308 | auto row_ids = move(chunk.data.back()); |
309 | chunk.data.pop_back(); |
310 | |
311 | // now perform the update |
312 | current_table->storage->Update(*current_table, context, row_ids, column_ids, chunk); |
313 | } |
314 | |