1 | #include "duckdb/catalog/catalog_entry/scalar_macro_catalog_entry.hpp" |
2 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
3 | #include "duckdb/catalog/catalog_entry/type_catalog_entry.hpp" |
4 | #include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" |
5 | #include "duckdb/common/printer.hpp" |
6 | #include "duckdb/common/serializer/buffered_file_reader.hpp" |
7 | #include "duckdb/common/string_util.hpp" |
8 | #include "duckdb/main/client_context.hpp" |
9 | #include "duckdb/main/connection.hpp" |
10 | #include "duckdb/main/database.hpp" |
11 | #include "duckdb/parser/parsed_data/alter_table_info.hpp" |
12 | #include "duckdb/parser/parsed_data/create_schema_info.hpp" |
13 | #include "duckdb/parser/parsed_data/create_view_info.hpp" |
14 | #include "duckdb/parser/parsed_data/drop_info.hpp" |
15 | #include "duckdb/planner/binder.hpp" |
16 | #include "duckdb/planner/parsed_data/bound_create_table_info.hpp" |
17 | #include "duckdb/storage/data_table.hpp" |
18 | #include "duckdb/storage/write_ahead_log.hpp" |
19 | #include "duckdb/storage/storage_manager.hpp" |
20 | #include "duckdb/main/attached_database.hpp" |
21 | #include "duckdb/execution/index/art/art.hpp" |
22 | #include "duckdb/catalog/catalog_entry/duck_index_entry.hpp" |
23 | |
24 | namespace duckdb { |
25 | |
26 | bool WriteAheadLog::Replay(AttachedDatabase &database, string &path) { |
27 | Connection con(database.GetDatabase()); |
28 | auto initial_reader = make_uniq<BufferedFileReader>(args&: FileSystem::Get(db&: database), args: path.c_str(), args: con.context.get()); |
29 | if (initial_reader->Finished()) { |
30 | // WAL is empty |
31 | return false; |
32 | } |
33 | |
34 | con.BeginTransaction(); |
35 | |
36 | // first deserialize the WAL to look for a checkpoint flag |
37 | // if there is a checkpoint flag, we might have already flushed the contents of the WAL to disk |
38 | ReplayState checkpoint_state(database, *con.context, *initial_reader); |
39 | initial_reader->SetCatalog(checkpoint_state.catalog); |
40 | checkpoint_state.deserialize_only = true; |
41 | try { |
42 | while (true) { |
43 | // read the current entry |
44 | WALType entry_type = initial_reader->Read<WALType>(); |
45 | if (entry_type == WALType::WAL_FLUSH) { |
46 | // check if the file is exhausted |
47 | if (initial_reader->Finished()) { |
48 | // we finished reading the file: break |
49 | break; |
50 | } |
51 | } else { |
52 | // replay the entry |
53 | checkpoint_state.ReplayEntry(entry_type); |
54 | } |
55 | } |
56 | } catch (std::exception &ex) { // LCOV_EXCL_START |
57 | Printer::Print(str: StringUtil::Format(fmt_str: "Exception in WAL playback during initial read: %s\n" , params: ex.what())); |
58 | return false; |
59 | } catch (...) { |
60 | Printer::Print(str: "Unknown Exception in WAL playback during initial read" ); |
61 | return false; |
62 | } // LCOV_EXCL_STOP |
63 | initial_reader.reset(); |
64 | if (checkpoint_state.checkpoint_id != INVALID_BLOCK) { |
65 | // there is a checkpoint flag: check if we need to deserialize the WAL |
66 | auto &manager = database.GetStorageManager(); |
67 | if (manager.IsCheckpointClean(checkpoint_id: checkpoint_state.checkpoint_id)) { |
68 | // the contents of the WAL have already been checkpointed |
69 | // we can safely truncate the WAL and ignore its contents |
70 | return true; |
71 | } |
72 | } |
73 | |
74 | // we need to recover from the WAL: actually set up the replay state |
75 | BufferedFileReader reader(FileSystem::Get(db&: database), path.c_str(), con.context.get()); |
76 | reader.SetCatalog(checkpoint_state.catalog); |
77 | ReplayState state(database, *con.context, reader); |
78 | |
79 | // replay the WAL |
80 | // note that everything is wrapped inside a try/catch block here |
81 | // there can be errors in WAL replay because of a corrupt WAL file |
82 | // in this case we should throw a warning but startup anyway |
83 | try { |
84 | while (true) { |
85 | // read the current entry |
86 | WALType entry_type = reader.Read<WALType>(); |
87 | if (entry_type == WALType::WAL_FLUSH) { |
88 | // flush: commit the current transaction |
89 | con.Commit(); |
90 | // check if the file is exhausted |
91 | if (reader.Finished()) { |
92 | // we finished reading the file: break |
93 | break; |
94 | } |
95 | // otherwise we keep on reading |
96 | con.BeginTransaction(); |
97 | } else { |
98 | // replay the entry |
99 | state.ReplayEntry(entry_type); |
100 | } |
101 | } |
102 | } catch (std::exception &ex) { // LCOV_EXCL_START |
103 | // FIXME: this should report a proper warning in the connection |
104 | Printer::Print(str: StringUtil::Format(fmt_str: "Exception in WAL playback: %s\n" , params: ex.what())); |
105 | // exception thrown in WAL replay: rollback |
106 | con.Rollback(); |
107 | } catch (...) { |
108 | Printer::Print(str: "Unknown Exception in WAL playback: %s\n" ); |
109 | // exception thrown in WAL replay: rollback |
110 | con.Rollback(); |
111 | } // LCOV_EXCL_STOP |
112 | return false; |
113 | } |
114 | |
115 | //===--------------------------------------------------------------------===// |
116 | // Replay Entries |
117 | //===--------------------------------------------------------------------===// |
118 | void ReplayState::ReplayEntry(WALType entry_type) { |
119 | switch (entry_type) { |
120 | case WALType::CREATE_TABLE: |
121 | ReplayCreateTable(); |
122 | break; |
123 | case WALType::DROP_TABLE: |
124 | ReplayDropTable(); |
125 | break; |
126 | case WALType::ALTER_INFO: |
127 | ReplayAlter(); |
128 | break; |
129 | case WALType::CREATE_VIEW: |
130 | ReplayCreateView(); |
131 | break; |
132 | case WALType::DROP_VIEW: |
133 | ReplayDropView(); |
134 | break; |
135 | case WALType::CREATE_SCHEMA: |
136 | ReplayCreateSchema(); |
137 | break; |
138 | case WALType::DROP_SCHEMA: |
139 | ReplayDropSchema(); |
140 | break; |
141 | case WALType::CREATE_SEQUENCE: |
142 | ReplayCreateSequence(); |
143 | break; |
144 | case WALType::DROP_SEQUENCE: |
145 | ReplayDropSequence(); |
146 | break; |
147 | case WALType::SEQUENCE_VALUE: |
148 | ReplaySequenceValue(); |
149 | break; |
150 | case WALType::CREATE_MACRO: |
151 | ReplayCreateMacro(); |
152 | break; |
153 | case WALType::DROP_MACRO: |
154 | ReplayDropMacro(); |
155 | break; |
156 | case WALType::CREATE_TABLE_MACRO: |
157 | ReplayCreateTableMacro(); |
158 | break; |
159 | case WALType::DROP_TABLE_MACRO: |
160 | ReplayDropTableMacro(); |
161 | break; |
162 | case WALType::CREATE_INDEX: |
163 | ReplayCreateIndex(); |
164 | break; |
165 | case WALType::DROP_INDEX: |
166 | ReplayDropIndex(); |
167 | break; |
168 | case WALType::USE_TABLE: |
169 | ReplayUseTable(); |
170 | break; |
171 | case WALType::INSERT_TUPLE: |
172 | ReplayInsert(); |
173 | break; |
174 | case WALType::DELETE_TUPLE: |
175 | ReplayDelete(); |
176 | break; |
177 | case WALType::UPDATE_TUPLE: |
178 | ReplayUpdate(); |
179 | break; |
180 | case WALType::CHECKPOINT: |
181 | ReplayCheckpoint(); |
182 | break; |
183 | case WALType::CREATE_TYPE: |
184 | ReplayCreateType(); |
185 | break; |
186 | case WALType::DROP_TYPE: |
187 | ReplayDropType(); |
188 | break; |
189 | default: |
190 | throw InternalException("Invalid WAL entry type!" ); |
191 | } |
192 | } |
193 | |
194 | //===--------------------------------------------------------------------===// |
195 | // Replay Table |
196 | //===--------------------------------------------------------------------===// |
197 | void ReplayState::ReplayCreateTable() { |
198 | auto info = TableCatalogEntry::Deserialize(source, context); |
199 | if (deserialize_only) { |
200 | return; |
201 | } |
202 | |
203 | // bind the constraints to the table again |
204 | |
205 | auto binder = Binder::CreateBinder(context); |
206 | auto &schema = catalog.GetSchema(context, name: info->schema); |
207 | auto bound_info = binder->BindCreateTableInfo(info: std::move(info), schema); |
208 | |
209 | catalog.CreateTable(context, info&: *bound_info); |
210 | } |
211 | |
212 | void ReplayState::ReplayDropTable() { |
213 | DropInfo info; |
214 | |
215 | info.type = CatalogType::TABLE_ENTRY; |
216 | info.schema = source.Read<string>(); |
217 | info.name = source.Read<string>(); |
218 | if (deserialize_only) { |
219 | return; |
220 | } |
221 | |
222 | catalog.DropEntry(context, info); |
223 | } |
224 | |
225 | void ReplayState::ReplayAlter() { |
226 | auto info = AlterInfo::Deserialize(source); |
227 | if (deserialize_only) { |
228 | return; |
229 | } |
230 | catalog.Alter(context, info&: *info); |
231 | } |
232 | |
233 | //===--------------------------------------------------------------------===// |
234 | // Replay View |
235 | //===--------------------------------------------------------------------===// |
236 | void ReplayState::ReplayCreateView() { |
237 | auto entry = ViewCatalogEntry::Deserialize(source, context); |
238 | if (deserialize_only) { |
239 | return; |
240 | } |
241 | |
242 | catalog.CreateView(context, info&: *entry); |
243 | } |
244 | |
245 | void ReplayState::ReplayDropView() { |
246 | DropInfo info; |
247 | info.type = CatalogType::VIEW_ENTRY; |
248 | info.schema = source.Read<string>(); |
249 | info.name = source.Read<string>(); |
250 | if (deserialize_only) { |
251 | return; |
252 | } |
253 | catalog.DropEntry(context, info); |
254 | } |
255 | |
256 | //===--------------------------------------------------------------------===// |
257 | // Replay Schema |
258 | //===--------------------------------------------------------------------===// |
259 | void ReplayState::ReplayCreateSchema() { |
260 | CreateSchemaInfo info; |
261 | info.schema = source.Read<string>(); |
262 | if (deserialize_only) { |
263 | return; |
264 | } |
265 | |
266 | catalog.CreateSchema(context, info); |
267 | } |
268 | |
269 | void ReplayState::ReplayDropSchema() { |
270 | DropInfo info; |
271 | |
272 | info.type = CatalogType::SCHEMA_ENTRY; |
273 | info.name = source.Read<string>(); |
274 | if (deserialize_only) { |
275 | return; |
276 | } |
277 | |
278 | catalog.DropEntry(context, info); |
279 | } |
280 | |
281 | //===--------------------------------------------------------------------===// |
282 | // Replay Custom Type |
283 | //===--------------------------------------------------------------------===// |
284 | void ReplayState::ReplayCreateType() { |
285 | auto info = TypeCatalogEntry::Deserialize(source); |
286 | info->on_conflict = OnCreateConflict::IGNORE_ON_CONFLICT; |
287 | catalog.CreateType(context, info&: *info); |
288 | } |
289 | |
290 | void ReplayState::ReplayDropType() { |
291 | DropInfo info; |
292 | |
293 | info.type = CatalogType::TYPE_ENTRY; |
294 | info.schema = source.Read<string>(); |
295 | info.name = source.Read<string>(); |
296 | if (deserialize_only) { |
297 | return; |
298 | } |
299 | |
300 | catalog.DropEntry(context, info); |
301 | } |
302 | |
303 | //===--------------------------------------------------------------------===// |
304 | // Replay Sequence |
305 | //===--------------------------------------------------------------------===// |
306 | void ReplayState::ReplayCreateSequence() { |
307 | auto entry = SequenceCatalogEntry::Deserialize(source); |
308 | if (deserialize_only) { |
309 | return; |
310 | } |
311 | |
312 | catalog.CreateSequence(context, info&: *entry); |
313 | } |
314 | |
315 | void ReplayState::ReplayDropSequence() { |
316 | DropInfo info; |
317 | info.type = CatalogType::SEQUENCE_ENTRY; |
318 | info.schema = source.Read<string>(); |
319 | info.name = source.Read<string>(); |
320 | if (deserialize_only) { |
321 | return; |
322 | } |
323 | |
324 | catalog.DropEntry(context, info); |
325 | } |
326 | |
327 | void ReplayState::ReplaySequenceValue() { |
328 | auto schema = source.Read<string>(); |
329 | auto name = source.Read<string>(); |
330 | auto usage_count = source.Read<uint64_t>(); |
331 | auto counter = source.Read<int64_t>(); |
332 | if (deserialize_only) { |
333 | return; |
334 | } |
335 | |
336 | // fetch the sequence from the catalog |
337 | auto &seq = catalog.GetEntry<SequenceCatalogEntry>(context, schema_name: schema, name); |
338 | if (usage_count > seq.usage_count) { |
339 | seq.usage_count = usage_count; |
340 | seq.counter = counter; |
341 | } |
342 | } |
343 | |
344 | //===--------------------------------------------------------------------===// |
345 | // Replay Macro |
346 | //===--------------------------------------------------------------------===// |
347 | void ReplayState::ReplayCreateMacro() { |
348 | auto entry = ScalarMacroCatalogEntry::Deserialize(main_source&: source, context); |
349 | if (deserialize_only) { |
350 | return; |
351 | } |
352 | |
353 | catalog.CreateFunction(context, info&: *entry); |
354 | } |
355 | |
356 | void ReplayState::ReplayDropMacro() { |
357 | DropInfo info; |
358 | info.type = CatalogType::MACRO_ENTRY; |
359 | info.schema = source.Read<string>(); |
360 | info.name = source.Read<string>(); |
361 | if (deserialize_only) { |
362 | return; |
363 | } |
364 | |
365 | catalog.DropEntry(context, info); |
366 | } |
367 | |
368 | //===--------------------------------------------------------------------===// |
369 | // Replay Table Macro |
370 | //===--------------------------------------------------------------------===// |
371 | void ReplayState::ReplayCreateTableMacro() { |
372 | auto entry = TableMacroCatalogEntry::Deserialize(main_source&: source, context); |
373 | if (deserialize_only) { |
374 | return; |
375 | } |
376 | |
377 | catalog.CreateFunction(context, info&: *entry); |
378 | } |
379 | |
380 | void ReplayState::ReplayDropTableMacro() { |
381 | DropInfo info; |
382 | info.type = CatalogType::TABLE_MACRO_ENTRY; |
383 | info.schema = source.Read<string>(); |
384 | info.name = source.Read<string>(); |
385 | if (deserialize_only) { |
386 | return; |
387 | } |
388 | |
389 | catalog.DropEntry(context, info); |
390 | } |
391 | |
392 | //===--------------------------------------------------------------------===// |
393 | // Replay Index |
394 | //===--------------------------------------------------------------------===// |
395 | void ReplayState::ReplayCreateIndex() { |
396 | auto info = IndexCatalogEntry::Deserialize(source, context); |
397 | if (deserialize_only) { |
398 | return; |
399 | } |
400 | |
401 | // get the physical table to which we'll add the index |
402 | auto &table = catalog.GetEntry<TableCatalogEntry>(context, schema_name: info->schema, name: info->table->table_name); |
403 | auto &data_table = table.GetStorage(); |
404 | |
405 | // bind the parsed expressions |
406 | if (info->expressions.empty()) { |
407 | for (auto &parsed_expr : info->parsed_expressions) { |
408 | info->expressions.push_back(x: parsed_expr->Copy()); |
409 | } |
410 | } |
411 | auto binder = Binder::CreateBinder(context); |
412 | auto expressions = binder->BindCreateIndexExpressions(table, info&: *info); |
413 | |
414 | // create the empty index |
415 | unique_ptr<Index> index; |
416 | switch (info->index_type) { |
417 | case IndexType::ART: { |
418 | index = make_uniq<ART>(args&: info->column_ids, args&: TableIOManager::Get(table&: data_table), args&: expressions, args&: info->constraint_type, |
419 | args&: data_table.db); |
420 | break; |
421 | } |
422 | default: |
423 | throw InternalException("Unimplemented index type" ); |
424 | } |
425 | |
426 | // add the index to the catalog |
427 | auto &index_entry = catalog.CreateIndex(context, info&: *info)->Cast<DuckIndexEntry>(); |
428 | index_entry.index = index.get(); |
429 | index_entry.info = data_table.info; |
430 | for (auto &parsed_expr : info->parsed_expressions) { |
431 | index_entry.parsed_expressions.push_back(x: parsed_expr->Copy()); |
432 | } |
433 | |
434 | // physically add the index to the data table storage |
435 | data_table.WALAddIndex(context, index: std::move(index), expressions); |
436 | } |
437 | |
438 | void ReplayState::ReplayDropIndex() { |
439 | DropInfo info; |
440 | info.type = CatalogType::INDEX_ENTRY; |
441 | info.schema = source.Read<string>(); |
442 | info.name = source.Read<string>(); |
443 | if (deserialize_only) { |
444 | return; |
445 | } |
446 | |
447 | catalog.DropEntry(context, info); |
448 | } |
449 | |
450 | //===--------------------------------------------------------------------===// |
451 | // Replay Data |
452 | //===--------------------------------------------------------------------===// |
453 | void ReplayState::ReplayUseTable() { |
454 | auto schema_name = source.Read<string>(); |
455 | auto table_name = source.Read<string>(); |
456 | if (deserialize_only) { |
457 | return; |
458 | } |
459 | current_table = &catalog.GetEntry<TableCatalogEntry>(context, schema_name, name: table_name); |
460 | } |
461 | |
462 | void ReplayState::ReplayInsert() { |
463 | DataChunk chunk; |
464 | chunk.Deserialize(source); |
465 | if (deserialize_only) { |
466 | return; |
467 | } |
468 | if (!current_table) { |
469 | throw Exception("Corrupt WAL: insert without table" ); |
470 | } |
471 | |
472 | // append to the current table |
473 | current_table->GetStorage().LocalAppend(table&: *current_table, context, chunk); |
474 | } |
475 | |
476 | void ReplayState::ReplayDelete() { |
477 | DataChunk chunk; |
478 | chunk.Deserialize(source); |
479 | if (deserialize_only) { |
480 | return; |
481 | } |
482 | if (!current_table) { |
483 | throw InternalException("Corrupt WAL: delete without table" ); |
484 | } |
485 | |
486 | D_ASSERT(chunk.ColumnCount() == 1 && chunk.data[0].GetType() == LogicalType::ROW_TYPE); |
487 | row_t row_ids[1]; |
488 | Vector row_identifiers(LogicalType::ROW_TYPE, data_ptr_cast(src: row_ids)); |
489 | |
490 | auto source_ids = FlatVector::GetData<row_t>(vector&: chunk.data[0]); |
491 | // delete the tuples from the current table |
492 | for (idx_t i = 0; i < chunk.size(); i++) { |
493 | row_ids[0] = source_ids[i]; |
494 | current_table->GetStorage().Delete(table&: *current_table, context, row_identifiers, count: 1); |
495 | } |
496 | } |
497 | |
498 | void ReplayState::ReplayUpdate() { |
499 | vector<column_t> column_path; |
500 | auto column_index_count = source.Read<idx_t>(); |
501 | column_path.reserve(n: column_index_count); |
502 | for (idx_t i = 0; i < column_index_count; i++) { |
503 | column_path.push_back(x: source.Read<column_t>()); |
504 | } |
505 | DataChunk chunk; |
506 | chunk.Deserialize(source); |
507 | if (deserialize_only) { |
508 | return; |
509 | } |
510 | if (!current_table) { |
511 | throw InternalException("Corrupt WAL: update without table" ); |
512 | } |
513 | |
514 | if (column_path[0] >= current_table->GetColumns().PhysicalColumnCount()) { |
515 | throw InternalException("Corrupt WAL: column index for update out of bounds" ); |
516 | } |
517 | |
518 | // remove the row id vector from the chunk |
519 | auto row_ids = std::move(chunk.data.back()); |
520 | chunk.data.pop_back(); |
521 | |
522 | // now perform the update |
523 | current_table->GetStorage().UpdateColumn(table&: *current_table, context, row_ids, column_path, updates&: chunk); |
524 | } |
525 | |
526 | void ReplayState::ReplayCheckpoint() { |
527 | checkpoint_id = source.Read<block_id_t>(); |
528 | } |
529 | |
530 | } // namespace duckdb |
531 | |