1#include "duckdb/storage/checkpoint_manager.hpp"
2
3#include "duckdb/catalog/duck_catalog.hpp"
4#include "duckdb/catalog/catalog_entry/duck_index_entry.hpp"
5#include "duckdb/catalog/catalog_entry/scalar_macro_catalog_entry.hpp"
6#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
7#include "duckdb/catalog/catalog_entry/sequence_catalog_entry.hpp"
8#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
9#include "duckdb/catalog/catalog_entry/type_catalog_entry.hpp"
10#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp"
11#include "duckdb/common/field_writer.hpp"
12#include "duckdb/common/serializer.hpp"
13#include "duckdb/execution/index/art/art.hpp"
14#include "duckdb/main/client_context.hpp"
15#include "duckdb/main/config.hpp"
16#include "duckdb/main/connection.hpp"
17#include "duckdb/main/database.hpp"
18#include "duckdb/parser/column_definition.hpp"
19#include "duckdb/parser/parsed_data/create_schema_info.hpp"
20#include "duckdb/parser/parsed_data/create_table_info.hpp"
21#include "duckdb/parser/parsed_data/create_view_info.hpp"
22#include "duckdb/parser/tableref/basetableref.hpp"
23#include "duckdb/planner/binder.hpp"
24#include "duckdb/planner/bound_tableref.hpp"
25#include "duckdb/planner/expression_binder/index_binder.hpp"
26#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
27#include "duckdb/storage/block_manager.hpp"
28#include "duckdb/storage/checkpoint/table_data_reader.hpp"
29#include "duckdb/storage/checkpoint/table_data_writer.hpp"
30#include "duckdb/storage/meta_block_reader.hpp"
31#include "duckdb/storage/table/column_checkpoint_state.hpp"
32#include "duckdb/transaction/transaction_manager.hpp"
33#include "duckdb/main/attached_database.hpp"
34
35namespace duckdb {
36
37void ReorderTableEntries(vector<reference<TableCatalogEntry>> &tables);
38
39SingleFileCheckpointWriter::SingleFileCheckpointWriter(AttachedDatabase &db, BlockManager &block_manager)
40 : CheckpointWriter(db), partial_block_manager(block_manager, CheckpointType::FULL_CHECKPOINT) {
41}
42
43BlockManager &SingleFileCheckpointWriter::GetBlockManager() {
44 auto &storage_manager = db.GetStorageManager().Cast<SingleFileStorageManager>();
45 return *storage_manager.block_manager;
46}
47
48MetaBlockWriter &SingleFileCheckpointWriter::GetMetaBlockWriter() {
49 return *metadata_writer;
50}
51
52unique_ptr<TableDataWriter> SingleFileCheckpointWriter::GetTableDataWriter(TableCatalogEntry &table) {
53 return make_uniq<SingleFileTableDataWriter>(args&: *this, args&: table, args&: *table_metadata_writer, args&: GetMetaBlockWriter());
54}
55
56void SingleFileCheckpointWriter::CreateCheckpoint() {
57 auto &config = DBConfig::Get(db);
58 auto &storage_manager = db.GetStorageManager().Cast<SingleFileStorageManager>();
59 if (storage_manager.InMemory()) {
60 return;
61 }
62 // assert that the checkpoint manager hasn't been used before
63 D_ASSERT(!metadata_writer);
64
65 auto &block_manager = GetBlockManager();
66
67 //! Set up the writers for the checkpoints
68 metadata_writer = make_uniq<MetaBlockWriter>(args&: block_manager);
69 table_metadata_writer = make_uniq<MetaBlockWriter>(args&: block_manager);
70
71 // get the id of the first meta block
72 block_id_t meta_block = metadata_writer->GetBlockPointer().block_id;
73
74 vector<reference<SchemaCatalogEntry>> schemas;
75 // we scan the set of committed schemas
76 auto &catalog = Catalog::GetCatalog(db).Cast<DuckCatalog>();
77 catalog.ScanSchemas(callback: [&](SchemaCatalogEntry &entry) { schemas.push_back(x: entry); });
78 // write the actual data into the database
79 // write the amount of schemas
80 metadata_writer->Write<uint32_t>(element: schemas.size());
81 for (auto &schema : schemas) {
82 WriteSchema(schema&: schema.get());
83 }
84 partial_block_manager.FlushPartialBlocks();
85 // flush the meta data to disk
86 metadata_writer->Flush();
87 table_metadata_writer->Flush();
88
89 // write a checkpoint flag to the WAL
90 // this protects against the rare event that the database crashes AFTER writing the file, but BEFORE truncating the
91 // WAL we write an entry CHECKPOINT "meta_block_id" into the WAL upon loading, if we see there is an entry
92 // CHECKPOINT "meta_block_id", and the id MATCHES the head idin the file we know that the database was successfully
93 // checkpointed, so we know that we should avoid replaying the WAL to avoid duplicating data
94 auto wal = storage_manager.GetWriteAheadLog();
95 wal->WriteCheckpoint(meta_block);
96 wal->Flush();
97
98 if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_HEADER) {
99 throw FatalException("Checkpoint aborted before header write because of PRAGMA checkpoint_abort flag");
100 }
101
102 // finally write the updated header
103 DatabaseHeader header;
104 header.meta_block = meta_block;
105 block_manager.WriteHeader(header);
106
107 if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_TRUNCATE) {
108 throw FatalException("Checkpoint aborted before truncate because of PRAGMA checkpoint_abort flag");
109 }
110
111 // truncate the WAL
112 wal->Truncate(size: 0);
113
114 // mark all blocks written as part of the metadata as modified
115 metadata_writer->MarkWrittenBlocks();
116 table_metadata_writer->MarkWrittenBlocks();
117}
118
119void SingleFileCheckpointReader::LoadFromStorage() {
120 auto &block_manager = *storage.block_manager;
121 block_id_t meta_block = block_manager.GetMetaBlock();
122 if (meta_block < 0) {
123 // storage is empty
124 return;
125 }
126
127 Connection con(storage.GetDatabase());
128 con.BeginTransaction();
129 // create the MetaBlockReader to read from the storage
130 MetaBlockReader reader(block_manager, meta_block);
131 reader.SetCatalog(catalog.GetAttached().GetCatalog());
132 reader.SetContext(*con.context);
133 LoadCheckpoint(context&: *con.context, reader);
134 con.Commit();
135}
136
137void CheckpointReader::LoadCheckpoint(ClientContext &context, MetaBlockReader &reader) {
138 uint32_t schema_count = reader.Read<uint32_t>();
139 for (uint32_t i = 0; i < schema_count; i++) {
140 ReadSchema(context, reader);
141 }
142}
143
144//===--------------------------------------------------------------------===//
145// Schema
146//===--------------------------------------------------------------------===//
147void CheckpointWriter::WriteSchema(SchemaCatalogEntry &schema) {
148 // write the schema data
149 schema.Serialize(serializer&: GetMetaBlockWriter());
150 // then, we fetch the tables/views/sequences information
151 vector<reference<TableCatalogEntry>> tables;
152 vector<reference<ViewCatalogEntry>> views;
153 schema.Scan(type: CatalogType::TABLE_ENTRY, callback: [&](CatalogEntry &entry) {
154 if (entry.internal) {
155 return;
156 }
157 if (entry.type == CatalogType::TABLE_ENTRY) {
158 tables.push_back(x: entry.Cast<TableCatalogEntry>());
159 } else if (entry.type == CatalogType::VIEW_ENTRY) {
160 views.push_back(x: entry.Cast<ViewCatalogEntry>());
161 } else {
162 throw NotImplementedException("Catalog type for entries");
163 }
164 });
165 vector<reference<SequenceCatalogEntry>> sequences;
166 schema.Scan(type: CatalogType::SEQUENCE_ENTRY, callback: [&](CatalogEntry &entry) {
167 if (entry.internal) {
168 return;
169 }
170 sequences.push_back(x: entry.Cast<SequenceCatalogEntry>());
171 });
172
173 vector<reference<TypeCatalogEntry>> custom_types;
174 schema.Scan(type: CatalogType::TYPE_ENTRY, callback: [&](CatalogEntry &entry) {
175 if (entry.internal) {
176 return;
177 }
178 custom_types.push_back(x: entry.Cast<TypeCatalogEntry>());
179 });
180
181 vector<reference<ScalarMacroCatalogEntry>> macros;
182 schema.Scan(type: CatalogType::SCALAR_FUNCTION_ENTRY, callback: [&](CatalogEntry &entry) {
183 if (entry.internal) {
184 return;
185 }
186 if (entry.type == CatalogType::MACRO_ENTRY) {
187 macros.push_back(x: entry.Cast<ScalarMacroCatalogEntry>());
188 }
189 });
190
191 vector<reference<TableMacroCatalogEntry>> table_macros;
192 schema.Scan(type: CatalogType::TABLE_FUNCTION_ENTRY, callback: [&](CatalogEntry &entry) {
193 if (entry.internal) {
194 return;
195 }
196 if (entry.type == CatalogType::TABLE_MACRO_ENTRY) {
197 table_macros.push_back(x: entry.Cast<TableMacroCatalogEntry>());
198 }
199 });
200
201 vector<reference<IndexCatalogEntry>> indexes;
202 schema.Scan(type: CatalogType::INDEX_ENTRY, callback: [&](CatalogEntry &entry) {
203 D_ASSERT(!entry.internal);
204 indexes.push_back(x: entry.Cast<IndexCatalogEntry>());
205 });
206
207 FieldWriter writer(GetMetaBlockWriter());
208 writer.WriteField<uint32_t>(element: custom_types.size());
209 writer.WriteField<uint32_t>(element: sequences.size());
210 writer.WriteField<uint32_t>(element: tables.size());
211 writer.WriteField<uint32_t>(element: views.size());
212 writer.WriteField<uint32_t>(element: macros.size());
213 writer.WriteField<uint32_t>(element: table_macros.size());
214 writer.WriteField<uint32_t>(element: indexes.size());
215 writer.Finalize();
216
217 // write the custom_types
218 for (auto &custom_type : custom_types) {
219 WriteType(type&: custom_type);
220 }
221
222 // write the sequences
223 for (auto &seq : sequences) {
224 WriteSequence(table&: seq);
225 }
226 // reorder tables because of foreign key constraint
227 ReorderTableEntries(tables);
228 // Write the tables
229 for (auto &table : tables) {
230 WriteTable(table);
231 }
232 // Write the views
233 for (auto &view : views) {
234 WriteView(table&: view);
235 }
236
237 // Write the macros
238 for (auto &macro : macros) {
239 WriteMacro(table&: macro);
240 }
241
242 // Write the table's macros
243 for (auto &macro : table_macros) {
244 WriteTableMacro(table&: macro);
245 }
246 // Write the indexes
247 for (auto &index : indexes) {
248 WriteIndex(index_catalog&: index);
249 }
250}
251
252void CheckpointReader::ReadSchema(ClientContext &context, MetaBlockReader &reader) {
253 // read the schema and create it in the catalog
254 auto info = SchemaCatalogEntry::Deserialize(source&: reader);
255 // we set create conflict to ignore to ignore the failure of recreating the main schema
256 info->on_conflict = OnCreateConflict::IGNORE_ON_CONFLICT;
257 catalog.CreateSchema(context, info&: *info);
258
259 // first read all the counts
260 FieldReader field_reader(reader);
261 uint32_t enum_count = field_reader.ReadRequired<uint32_t>();
262 uint32_t seq_count = field_reader.ReadRequired<uint32_t>();
263 uint32_t table_count = field_reader.ReadRequired<uint32_t>();
264 uint32_t view_count = field_reader.ReadRequired<uint32_t>();
265 uint32_t macro_count = field_reader.ReadRequired<uint32_t>();
266 uint32_t table_macro_count = field_reader.ReadRequired<uint32_t>();
267 uint32_t table_index_count = field_reader.ReadRequired<uint32_t>();
268 field_reader.Finalize();
269
270 // now read the enums
271 for (uint32_t i = 0; i < enum_count; i++) {
272 ReadType(context, reader);
273 }
274
275 // read the sequences
276 for (uint32_t i = 0; i < seq_count; i++) {
277 ReadSequence(context, reader);
278 }
279 // read the table count and recreate the tables
280 for (uint32_t i = 0; i < table_count; i++) {
281 ReadTable(context, reader);
282 }
283 // now read the views
284 for (uint32_t i = 0; i < view_count; i++) {
285 ReadView(context, reader);
286 }
287
288 // finally read the macro's
289 for (uint32_t i = 0; i < macro_count; i++) {
290 ReadMacro(context, reader);
291 }
292
293 for (uint32_t i = 0; i < table_macro_count; i++) {
294 ReadTableMacro(context, reader);
295 }
296 for (uint32_t i = 0; i < table_index_count; i++) {
297 ReadIndex(context, reader);
298 }
299}
300
301//===--------------------------------------------------------------------===//
302// Views
303//===--------------------------------------------------------------------===//
304void CheckpointWriter::WriteView(ViewCatalogEntry &view) {
305 view.Serialize(serializer&: GetMetaBlockWriter());
306}
307
308void CheckpointReader::ReadView(ClientContext &context, MetaBlockReader &reader) {
309 auto info = ViewCatalogEntry::Deserialize(source&: reader, context);
310 catalog.CreateView(context, info&: *info);
311}
312
313//===--------------------------------------------------------------------===//
314// Sequences
315//===--------------------------------------------------------------------===//
316void CheckpointWriter::WriteSequence(SequenceCatalogEntry &seq) {
317 seq.Serialize(serializer&: GetMetaBlockWriter());
318}
319
320void CheckpointReader::ReadSequence(ClientContext &context, MetaBlockReader &reader) {
321 auto info = SequenceCatalogEntry::Deserialize(source&: reader);
322 catalog.CreateSequence(context, info&: *info);
323}
324
325//===--------------------------------------------------------------------===//
326// Indexes
327//===--------------------------------------------------------------------===//
328void CheckpointWriter::WriteIndex(IndexCatalogEntry &index_catalog) {
329 // The index data should already have been written as part of WriteTableData.
330 // Here, we need only serialize the pointer to that data.
331 auto root_offset = index_catalog.index->GetSerializedDataPointer();
332 auto &metadata_writer = GetMetaBlockWriter();
333 index_catalog.Serialize(serializer&: metadata_writer);
334 // Serialize the Block id and offset of root node
335 metadata_writer.Write(element: root_offset.block_id);
336 metadata_writer.Write(element: root_offset.offset);
337}
338
339void CheckpointReader::ReadIndex(ClientContext &context, MetaBlockReader &reader) {
340 // deserialize the index metadata
341 auto info = IndexCatalogEntry::Deserialize(source&: reader, context);
342
343 // create the index in the catalog
344 auto &schema_catalog = catalog.GetSchema(context, name: info->schema);
345 auto &table_catalog = catalog.GetEntry(context, type: CatalogType::TABLE_ENTRY, schema: info->schema, name: info->table->table_name)
346 .Cast<DuckTableEntry>();
347 auto &index_catalog = schema_catalog.CreateIndex(context, info&: *info, table&: table_catalog)->Cast<DuckIndexEntry>();
348 index_catalog.info = table_catalog.GetStorage().info;
349
350 // we deserialize the index lazily, i.e., we do not need to load any node information
351 // except the root block id and offset
352 auto root_block_id = reader.Read<block_id_t>();
353 auto root_offset = reader.Read<uint32_t>();
354
355 // obtain the expressions of the ART from the index metadata
356 vector<unique_ptr<Expression>> unbound_expressions;
357 vector<unique_ptr<ParsedExpression>> parsed_expressions;
358 for (auto &p_exp : info->parsed_expressions) {
359 parsed_expressions.push_back(x: p_exp->Copy());
360 }
361
362 // bind the parsed expressions
363 // add the table to the bind context
364 auto binder = Binder::CreateBinder(context);
365 vector<LogicalType> column_types;
366 vector<string> column_names;
367 for (auto &col : table_catalog.GetColumns().Logical()) {
368 column_types.push_back(x: col.Type());
369 column_names.push_back(x: col.Name());
370 }
371 vector<column_t> column_ids;
372 binder->bind_context.AddBaseTable(index: 0, alias: info->table->table_name, names: column_names, types: column_types, bound_column_ids&: column_ids,
373 entry: &table_catalog);
374 IndexBinder idx_binder(*binder, context);
375 unbound_expressions.reserve(n: parsed_expressions.size());
376 for (auto &expr : parsed_expressions) {
377 unbound_expressions.push_back(x: idx_binder.Bind(expr));
378 }
379
380 if (parsed_expressions.empty()) {
381 // this is a PK/FK index: we create the necessary bound column ref expressions
382 unbound_expressions.reserve(n: info->column_ids.size());
383 for (idx_t key_nr = 0; key_nr < info->column_ids.size(); key_nr++) {
384 auto &col = table_catalog.GetColumn(idx: LogicalIndex(info->column_ids[key_nr]));
385 unbound_expressions.push_back(
386 x: make_uniq<BoundColumnRefExpression>(args: col.GetName(), args: col.GetType(), args: ColumnBinding(0, key_nr)));
387 }
388 }
389
390 // create the index and add it to the storage
391 switch (info->index_type) {
392 case IndexType::ART: {
393 auto &storage = table_catalog.GetStorage();
394 auto art = make_uniq<ART>(args&: info->column_ids, args&: TableIOManager::Get(table&: storage), args: std::move(unbound_expressions),
395 args&: info->constraint_type, args&: storage.db, args&: root_block_id, args&: root_offset);
396 index_catalog.index = art.get();
397 storage.info->indexes.AddIndex(index: std::move(art));
398 break;
399 }
400 default:
401 throw InternalException("Unknown index type for ReadIndex");
402 }
403}
404
405//===--------------------------------------------------------------------===//
406// Custom Types
407//===--------------------------------------------------------------------===//
408void CheckpointWriter::WriteType(TypeCatalogEntry &type) {
409 type.Serialize(serializer&: GetMetaBlockWriter());
410}
411
412void CheckpointReader::ReadType(ClientContext &context, MetaBlockReader &reader) {
413 auto info = TypeCatalogEntry::Deserialize(source&: reader);
414 auto &catalog_entry = catalog.CreateType(context, info&: *info)->Cast<TypeCatalogEntry>();
415 if (info->type.id() == LogicalTypeId::ENUM) {
416 EnumType::SetCatalog(type&: info->type, catalog_entry: &catalog_entry);
417 }
418}
419
420//===--------------------------------------------------------------------===//
421// Macro's
422//===--------------------------------------------------------------------===//
423void CheckpointWriter::WriteMacro(ScalarMacroCatalogEntry &macro) {
424 macro.Serialize(serializer&: GetMetaBlockWriter());
425}
426
427void CheckpointReader::ReadMacro(ClientContext &context, MetaBlockReader &reader) {
428 auto info = MacroCatalogEntry::Deserialize(main_source&: reader, context);
429 catalog.CreateFunction(context, info&: *info);
430}
431
432void CheckpointWriter::WriteTableMacro(TableMacroCatalogEntry &macro) {
433 macro.Serialize(serializer&: GetMetaBlockWriter());
434}
435
436void CheckpointReader::ReadTableMacro(ClientContext &context, MetaBlockReader &reader) {
437 auto info = MacroCatalogEntry::Deserialize(main_source&: reader, context);
438 catalog.CreateFunction(context, info&: *info);
439}
440
441//===--------------------------------------------------------------------===//
442// Table Metadata
443//===--------------------------------------------------------------------===//
444void CheckpointWriter::WriteTable(TableCatalogEntry &table) {
445 // write the table meta data
446 table.Serialize(serializer&: GetMetaBlockWriter());
447 // now we need to write the table data.
448 if (auto writer = GetTableDataWriter(table)) {
449 writer->WriteTableData();
450 }
451}
452
453void CheckpointReader::ReadTable(ClientContext &context, MetaBlockReader &reader) {
454 // deserialize the table meta data
455 auto info = TableCatalogEntry::Deserialize(source&: reader, context);
456 // bind the info
457 auto binder = Binder::CreateBinder(context);
458 auto &schema = catalog.GetSchema(context, name: info->schema);
459 auto bound_info = binder->BindCreateTableInfo(info: std::move(info), schema);
460
461 // now read the actual table data and place it into the create table info
462 ReadTableData(context, reader, bound_info&: *bound_info);
463
464 // finally create the table in the catalog
465 catalog.CreateTable(context, info&: *bound_info);
466}
467
468void CheckpointReader::ReadTableData(ClientContext &context, MetaBlockReader &reader,
469 BoundCreateTableInfo &bound_info) {
470 auto block_id = reader.Read<block_id_t>();
471 auto offset = reader.Read<uint64_t>();
472
473 MetaBlockReader table_data_reader(reader.block_manager, block_id);
474 table_data_reader.offset = offset;
475 TableDataReader data_reader(table_data_reader, bound_info);
476
477 data_reader.ReadTableData();
478 bound_info.data->total_rows = reader.Read<idx_t>();
479
480 // Get any indexes block info
481 idx_t num_indexes = reader.Read<idx_t>();
482 for (idx_t i = 0; i < num_indexes; i++) {
483 auto idx_block_id = reader.Read<idx_t>();
484 auto idx_offset = reader.Read<idx_t>();
485 bound_info.indexes.emplace_back(args&: idx_block_id, args&: idx_offset);
486 }
487}
488
489} // namespace duckdb
490