1#include "duckdb/catalog/catalog_entry/scalar_macro_catalog_entry.hpp"
2#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
3#include "duckdb/catalog/catalog_entry/type_catalog_entry.hpp"
4#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp"
5#include "duckdb/common/printer.hpp"
6#include "duckdb/common/serializer/buffered_file_reader.hpp"
7#include "duckdb/common/string_util.hpp"
8#include "duckdb/main/client_context.hpp"
9#include "duckdb/main/connection.hpp"
10#include "duckdb/main/database.hpp"
11#include "duckdb/parser/parsed_data/alter_table_info.hpp"
12#include "duckdb/parser/parsed_data/create_schema_info.hpp"
13#include "duckdb/parser/parsed_data/create_view_info.hpp"
14#include "duckdb/parser/parsed_data/drop_info.hpp"
15#include "duckdb/planner/binder.hpp"
16#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
17#include "duckdb/storage/data_table.hpp"
18#include "duckdb/storage/write_ahead_log.hpp"
19#include "duckdb/storage/storage_manager.hpp"
20#include "duckdb/main/attached_database.hpp"
21#include "duckdb/execution/index/art/art.hpp"
22#include "duckdb/catalog/catalog_entry/duck_index_entry.hpp"
23
24namespace duckdb {
25
26bool WriteAheadLog::Replay(AttachedDatabase &database, string &path) {
27 Connection con(database.GetDatabase());
28 auto initial_reader = make_uniq<BufferedFileReader>(args&: FileSystem::Get(db&: database), args: path.c_str(), args: con.context.get());
29 if (initial_reader->Finished()) {
30 // WAL is empty
31 return false;
32 }
33
34 con.BeginTransaction();
35
36 // first deserialize the WAL to look for a checkpoint flag
37 // if there is a checkpoint flag, we might have already flushed the contents of the WAL to disk
38 ReplayState checkpoint_state(database, *con.context, *initial_reader);
39 initial_reader->SetCatalog(checkpoint_state.catalog);
40 checkpoint_state.deserialize_only = true;
41 try {
42 while (true) {
43 // read the current entry
44 WALType entry_type = initial_reader->Read<WALType>();
45 if (entry_type == WALType::WAL_FLUSH) {
46 // check if the file is exhausted
47 if (initial_reader->Finished()) {
48 // we finished reading the file: break
49 break;
50 }
51 } else {
52 // replay the entry
53 checkpoint_state.ReplayEntry(entry_type);
54 }
55 }
56 } catch (std::exception &ex) { // LCOV_EXCL_START
57 Printer::Print(str: StringUtil::Format(fmt_str: "Exception in WAL playback during initial read: %s\n", params: ex.what()));
58 return false;
59 } catch (...) {
60 Printer::Print(str: "Unknown Exception in WAL playback during initial read");
61 return false;
62 } // LCOV_EXCL_STOP
63 initial_reader.reset();
64 if (checkpoint_state.checkpoint_id != INVALID_BLOCK) {
65 // there is a checkpoint flag: check if we need to deserialize the WAL
66 auto &manager = database.GetStorageManager();
67 if (manager.IsCheckpointClean(checkpoint_id: checkpoint_state.checkpoint_id)) {
68 // the contents of the WAL have already been checkpointed
69 // we can safely truncate the WAL and ignore its contents
70 return true;
71 }
72 }
73
74 // we need to recover from the WAL: actually set up the replay state
75 BufferedFileReader reader(FileSystem::Get(db&: database), path.c_str(), con.context.get());
76 reader.SetCatalog(checkpoint_state.catalog);
77 ReplayState state(database, *con.context, reader);
78
79 // replay the WAL
80 // note that everything is wrapped inside a try/catch block here
81 // there can be errors in WAL replay because of a corrupt WAL file
82 // in this case we should throw a warning but startup anyway
83 try {
84 while (true) {
85 // read the current entry
86 WALType entry_type = reader.Read<WALType>();
87 if (entry_type == WALType::WAL_FLUSH) {
88 // flush: commit the current transaction
89 con.Commit();
90 // check if the file is exhausted
91 if (reader.Finished()) {
92 // we finished reading the file: break
93 break;
94 }
95 // otherwise we keep on reading
96 con.BeginTransaction();
97 } else {
98 // replay the entry
99 state.ReplayEntry(entry_type);
100 }
101 }
102 } catch (std::exception &ex) { // LCOV_EXCL_START
103 // FIXME: this should report a proper warning in the connection
104 Printer::Print(str: StringUtil::Format(fmt_str: "Exception in WAL playback: %s\n", params: ex.what()));
105 // exception thrown in WAL replay: rollback
106 con.Rollback();
107 } catch (...) {
108 Printer::Print(str: "Unknown Exception in WAL playback: %s\n");
109 // exception thrown in WAL replay: rollback
110 con.Rollback();
111 } // LCOV_EXCL_STOP
112 return false;
113}
114
115//===--------------------------------------------------------------------===//
116// Replay Entries
117//===--------------------------------------------------------------------===//
118void ReplayState::ReplayEntry(WALType entry_type) {
119 switch (entry_type) {
120 case WALType::CREATE_TABLE:
121 ReplayCreateTable();
122 break;
123 case WALType::DROP_TABLE:
124 ReplayDropTable();
125 break;
126 case WALType::ALTER_INFO:
127 ReplayAlter();
128 break;
129 case WALType::CREATE_VIEW:
130 ReplayCreateView();
131 break;
132 case WALType::DROP_VIEW:
133 ReplayDropView();
134 break;
135 case WALType::CREATE_SCHEMA:
136 ReplayCreateSchema();
137 break;
138 case WALType::DROP_SCHEMA:
139 ReplayDropSchema();
140 break;
141 case WALType::CREATE_SEQUENCE:
142 ReplayCreateSequence();
143 break;
144 case WALType::DROP_SEQUENCE:
145 ReplayDropSequence();
146 break;
147 case WALType::SEQUENCE_VALUE:
148 ReplaySequenceValue();
149 break;
150 case WALType::CREATE_MACRO:
151 ReplayCreateMacro();
152 break;
153 case WALType::DROP_MACRO:
154 ReplayDropMacro();
155 break;
156 case WALType::CREATE_TABLE_MACRO:
157 ReplayCreateTableMacro();
158 break;
159 case WALType::DROP_TABLE_MACRO:
160 ReplayDropTableMacro();
161 break;
162 case WALType::CREATE_INDEX:
163 ReplayCreateIndex();
164 break;
165 case WALType::DROP_INDEX:
166 ReplayDropIndex();
167 break;
168 case WALType::USE_TABLE:
169 ReplayUseTable();
170 break;
171 case WALType::INSERT_TUPLE:
172 ReplayInsert();
173 break;
174 case WALType::DELETE_TUPLE:
175 ReplayDelete();
176 break;
177 case WALType::UPDATE_TUPLE:
178 ReplayUpdate();
179 break;
180 case WALType::CHECKPOINT:
181 ReplayCheckpoint();
182 break;
183 case WALType::CREATE_TYPE:
184 ReplayCreateType();
185 break;
186 case WALType::DROP_TYPE:
187 ReplayDropType();
188 break;
189 default:
190 throw InternalException("Invalid WAL entry type!");
191 }
192}
193
194//===--------------------------------------------------------------------===//
195// Replay Table
196//===--------------------------------------------------------------------===//
197void ReplayState::ReplayCreateTable() {
198 auto info = TableCatalogEntry::Deserialize(source, context);
199 if (deserialize_only) {
200 return;
201 }
202
203 // bind the constraints to the table again
204
205 auto binder = Binder::CreateBinder(context);
206 auto &schema = catalog.GetSchema(context, name: info->schema);
207 auto bound_info = binder->BindCreateTableInfo(info: std::move(info), schema);
208
209 catalog.CreateTable(context, info&: *bound_info);
210}
211
212void ReplayState::ReplayDropTable() {
213 DropInfo info;
214
215 info.type = CatalogType::TABLE_ENTRY;
216 info.schema = source.Read<string>();
217 info.name = source.Read<string>();
218 if (deserialize_only) {
219 return;
220 }
221
222 catalog.DropEntry(context, info);
223}
224
225void ReplayState::ReplayAlter() {
226 auto info = AlterInfo::Deserialize(source);
227 if (deserialize_only) {
228 return;
229 }
230 catalog.Alter(context, info&: *info);
231}
232
233//===--------------------------------------------------------------------===//
234// Replay View
235//===--------------------------------------------------------------------===//
236void ReplayState::ReplayCreateView() {
237 auto entry = ViewCatalogEntry::Deserialize(source, context);
238 if (deserialize_only) {
239 return;
240 }
241
242 catalog.CreateView(context, info&: *entry);
243}
244
245void ReplayState::ReplayDropView() {
246 DropInfo info;
247 info.type = CatalogType::VIEW_ENTRY;
248 info.schema = source.Read<string>();
249 info.name = source.Read<string>();
250 if (deserialize_only) {
251 return;
252 }
253 catalog.DropEntry(context, info);
254}
255
256//===--------------------------------------------------------------------===//
257// Replay Schema
258//===--------------------------------------------------------------------===//
259void ReplayState::ReplayCreateSchema() {
260 CreateSchemaInfo info;
261 info.schema = source.Read<string>();
262 if (deserialize_only) {
263 return;
264 }
265
266 catalog.CreateSchema(context, info);
267}
268
269void ReplayState::ReplayDropSchema() {
270 DropInfo info;
271
272 info.type = CatalogType::SCHEMA_ENTRY;
273 info.name = source.Read<string>();
274 if (deserialize_only) {
275 return;
276 }
277
278 catalog.DropEntry(context, info);
279}
280
281//===--------------------------------------------------------------------===//
282// Replay Custom Type
283//===--------------------------------------------------------------------===//
284void ReplayState::ReplayCreateType() {
285 auto info = TypeCatalogEntry::Deserialize(source);
286 info->on_conflict = OnCreateConflict::IGNORE_ON_CONFLICT;
287 catalog.CreateType(context, info&: *info);
288}
289
290void ReplayState::ReplayDropType() {
291 DropInfo info;
292
293 info.type = CatalogType::TYPE_ENTRY;
294 info.schema = source.Read<string>();
295 info.name = source.Read<string>();
296 if (deserialize_only) {
297 return;
298 }
299
300 catalog.DropEntry(context, info);
301}
302
303//===--------------------------------------------------------------------===//
304// Replay Sequence
305//===--------------------------------------------------------------------===//
306void ReplayState::ReplayCreateSequence() {
307 auto entry = SequenceCatalogEntry::Deserialize(source);
308 if (deserialize_only) {
309 return;
310 }
311
312 catalog.CreateSequence(context, info&: *entry);
313}
314
315void ReplayState::ReplayDropSequence() {
316 DropInfo info;
317 info.type = CatalogType::SEQUENCE_ENTRY;
318 info.schema = source.Read<string>();
319 info.name = source.Read<string>();
320 if (deserialize_only) {
321 return;
322 }
323
324 catalog.DropEntry(context, info);
325}
326
327void ReplayState::ReplaySequenceValue() {
328 auto schema = source.Read<string>();
329 auto name = source.Read<string>();
330 auto usage_count = source.Read<uint64_t>();
331 auto counter = source.Read<int64_t>();
332 if (deserialize_only) {
333 return;
334 }
335
336 // fetch the sequence from the catalog
337 auto &seq = catalog.GetEntry<SequenceCatalogEntry>(context, schema_name: schema, name);
338 if (usage_count > seq.usage_count) {
339 seq.usage_count = usage_count;
340 seq.counter = counter;
341 }
342}
343
344//===--------------------------------------------------------------------===//
345// Replay Macro
346//===--------------------------------------------------------------------===//
347void ReplayState::ReplayCreateMacro() {
348 auto entry = ScalarMacroCatalogEntry::Deserialize(main_source&: source, context);
349 if (deserialize_only) {
350 return;
351 }
352
353 catalog.CreateFunction(context, info&: *entry);
354}
355
356void ReplayState::ReplayDropMacro() {
357 DropInfo info;
358 info.type = CatalogType::MACRO_ENTRY;
359 info.schema = source.Read<string>();
360 info.name = source.Read<string>();
361 if (deserialize_only) {
362 return;
363 }
364
365 catalog.DropEntry(context, info);
366}
367
368//===--------------------------------------------------------------------===//
369// Replay Table Macro
370//===--------------------------------------------------------------------===//
371void ReplayState::ReplayCreateTableMacro() {
372 auto entry = TableMacroCatalogEntry::Deserialize(main_source&: source, context);
373 if (deserialize_only) {
374 return;
375 }
376
377 catalog.CreateFunction(context, info&: *entry);
378}
379
380void ReplayState::ReplayDropTableMacro() {
381 DropInfo info;
382 info.type = CatalogType::TABLE_MACRO_ENTRY;
383 info.schema = source.Read<string>();
384 info.name = source.Read<string>();
385 if (deserialize_only) {
386 return;
387 }
388
389 catalog.DropEntry(context, info);
390}
391
392//===--------------------------------------------------------------------===//
393// Replay Index
394//===--------------------------------------------------------------------===//
395void ReplayState::ReplayCreateIndex() {
396 auto info = IndexCatalogEntry::Deserialize(source, context);
397 if (deserialize_only) {
398 return;
399 }
400
401 // get the physical table to which we'll add the index
402 auto &table = catalog.GetEntry<TableCatalogEntry>(context, schema_name: info->schema, name: info->table->table_name);
403 auto &data_table = table.GetStorage();
404
405 // bind the parsed expressions
406 if (info->expressions.empty()) {
407 for (auto &parsed_expr : info->parsed_expressions) {
408 info->expressions.push_back(x: parsed_expr->Copy());
409 }
410 }
411 auto binder = Binder::CreateBinder(context);
412 auto expressions = binder->BindCreateIndexExpressions(table, info&: *info);
413
414 // create the empty index
415 unique_ptr<Index> index;
416 switch (info->index_type) {
417 case IndexType::ART: {
418 index = make_uniq<ART>(args&: info->column_ids, args&: TableIOManager::Get(table&: data_table), args&: expressions, args&: info->constraint_type,
419 args&: data_table.db);
420 break;
421 }
422 default:
423 throw InternalException("Unimplemented index type");
424 }
425
426 // add the index to the catalog
427 auto &index_entry = catalog.CreateIndex(context, info&: *info)->Cast<DuckIndexEntry>();
428 index_entry.index = index.get();
429 index_entry.info = data_table.info;
430 for (auto &parsed_expr : info->parsed_expressions) {
431 index_entry.parsed_expressions.push_back(x: parsed_expr->Copy());
432 }
433
434 // physically add the index to the data table storage
435 data_table.WALAddIndex(context, index: std::move(index), expressions);
436}
437
438void ReplayState::ReplayDropIndex() {
439 DropInfo info;
440 info.type = CatalogType::INDEX_ENTRY;
441 info.schema = source.Read<string>();
442 info.name = source.Read<string>();
443 if (deserialize_only) {
444 return;
445 }
446
447 catalog.DropEntry(context, info);
448}
449
450//===--------------------------------------------------------------------===//
451// Replay Data
452//===--------------------------------------------------------------------===//
453void ReplayState::ReplayUseTable() {
454 auto schema_name = source.Read<string>();
455 auto table_name = source.Read<string>();
456 if (deserialize_only) {
457 return;
458 }
459 current_table = &catalog.GetEntry<TableCatalogEntry>(context, schema_name, name: table_name);
460}
461
462void ReplayState::ReplayInsert() {
463 DataChunk chunk;
464 chunk.Deserialize(source);
465 if (deserialize_only) {
466 return;
467 }
468 if (!current_table) {
469 throw Exception("Corrupt WAL: insert without table");
470 }
471
472 // append to the current table
473 current_table->GetStorage().LocalAppend(table&: *current_table, context, chunk);
474}
475
476void ReplayState::ReplayDelete() {
477 DataChunk chunk;
478 chunk.Deserialize(source);
479 if (deserialize_only) {
480 return;
481 }
482 if (!current_table) {
483 throw InternalException("Corrupt WAL: delete without table");
484 }
485
486 D_ASSERT(chunk.ColumnCount() == 1 && chunk.data[0].GetType() == LogicalType::ROW_TYPE);
487 row_t row_ids[1];
488 Vector row_identifiers(LogicalType::ROW_TYPE, data_ptr_cast(src: row_ids));
489
490 auto source_ids = FlatVector::GetData<row_t>(vector&: chunk.data[0]);
491 // delete the tuples from the current table
492 for (idx_t i = 0; i < chunk.size(); i++) {
493 row_ids[0] = source_ids[i];
494 current_table->GetStorage().Delete(table&: *current_table, context, row_identifiers, count: 1);
495 }
496}
497
498void ReplayState::ReplayUpdate() {
499 vector<column_t> column_path;
500 auto column_index_count = source.Read<idx_t>();
501 column_path.reserve(n: column_index_count);
502 for (idx_t i = 0; i < column_index_count; i++) {
503 column_path.push_back(x: source.Read<column_t>());
504 }
505 DataChunk chunk;
506 chunk.Deserialize(source);
507 if (deserialize_only) {
508 return;
509 }
510 if (!current_table) {
511 throw InternalException("Corrupt WAL: update without table");
512 }
513
514 if (column_path[0] >= current_table->GetColumns().PhysicalColumnCount()) {
515 throw InternalException("Corrupt WAL: column index for update out of bounds");
516 }
517
518 // remove the row id vector from the chunk
519 auto row_ids = std::move(chunk.data.back());
520 chunk.data.pop_back();
521
522 // now perform the update
523 current_table->GetStorage().UpdateColumn(table&: *current_table, context, row_ids, column_path, updates&: chunk);
524}
525
526void ReplayState::ReplayCheckpoint() {
527 checkpoint_id = source.Read<block_id_t>();
528}
529
530} // namespace duckdb
531