1 | #include "duckdb/storage/checkpoint_manager.hpp" |
2 | |
3 | #include "duckdb/catalog/duck_catalog.hpp" |
4 | #include "duckdb/catalog/catalog_entry/duck_index_entry.hpp" |
5 | #include "duckdb/catalog/catalog_entry/scalar_macro_catalog_entry.hpp" |
6 | #include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" |
7 | #include "duckdb/catalog/catalog_entry/sequence_catalog_entry.hpp" |
8 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
9 | #include "duckdb/catalog/catalog_entry/type_catalog_entry.hpp" |
10 | #include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp" |
11 | #include "duckdb/common/field_writer.hpp" |
12 | #include "duckdb/common/serializer.hpp" |
13 | #include "duckdb/execution/index/art/art.hpp" |
14 | #include "duckdb/main/client_context.hpp" |
15 | #include "duckdb/main/config.hpp" |
16 | #include "duckdb/main/connection.hpp" |
17 | #include "duckdb/main/database.hpp" |
18 | #include "duckdb/parser/column_definition.hpp" |
19 | #include "duckdb/parser/parsed_data/create_schema_info.hpp" |
20 | #include "duckdb/parser/parsed_data/create_table_info.hpp" |
21 | #include "duckdb/parser/parsed_data/create_view_info.hpp" |
22 | #include "duckdb/parser/tableref/basetableref.hpp" |
23 | #include "duckdb/planner/binder.hpp" |
24 | #include "duckdb/planner/bound_tableref.hpp" |
25 | #include "duckdb/planner/expression_binder/index_binder.hpp" |
26 | #include "duckdb/planner/parsed_data/bound_create_table_info.hpp" |
27 | #include "duckdb/storage/block_manager.hpp" |
28 | #include "duckdb/storage/checkpoint/table_data_reader.hpp" |
29 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
30 | #include "duckdb/storage/meta_block_reader.hpp" |
31 | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
32 | #include "duckdb/transaction/transaction_manager.hpp" |
33 | #include "duckdb/main/attached_database.hpp" |
34 | |
35 | namespace duckdb { |
36 | |
37 | void ReorderTableEntries(vector<reference<TableCatalogEntry>> &tables); |
38 | |
39 | SingleFileCheckpointWriter::SingleFileCheckpointWriter(AttachedDatabase &db, BlockManager &block_manager) |
40 | : CheckpointWriter(db), partial_block_manager(block_manager, CheckpointType::FULL_CHECKPOINT) { |
41 | } |
42 | |
43 | BlockManager &SingleFileCheckpointWriter::GetBlockManager() { |
44 | auto &storage_manager = db.GetStorageManager().Cast<SingleFileStorageManager>(); |
45 | return *storage_manager.block_manager; |
46 | } |
47 | |
48 | MetaBlockWriter &SingleFileCheckpointWriter::GetMetaBlockWriter() { |
49 | return *metadata_writer; |
50 | } |
51 | |
52 | unique_ptr<TableDataWriter> SingleFileCheckpointWriter::GetTableDataWriter(TableCatalogEntry &table) { |
53 | return make_uniq<SingleFileTableDataWriter>(args&: *this, args&: table, args&: *table_metadata_writer, args&: GetMetaBlockWriter()); |
54 | } |
55 | |
56 | void SingleFileCheckpointWriter::CreateCheckpoint() { |
57 | auto &config = DBConfig::Get(db); |
58 | auto &storage_manager = db.GetStorageManager().Cast<SingleFileStorageManager>(); |
59 | if (storage_manager.InMemory()) { |
60 | return; |
61 | } |
62 | // assert that the checkpoint manager hasn't been used before |
63 | D_ASSERT(!metadata_writer); |
64 | |
65 | auto &block_manager = GetBlockManager(); |
66 | |
67 | //! Set up the writers for the checkpoints |
68 | metadata_writer = make_uniq<MetaBlockWriter>(args&: block_manager); |
69 | table_metadata_writer = make_uniq<MetaBlockWriter>(args&: block_manager); |
70 | |
71 | // get the id of the first meta block |
72 | block_id_t meta_block = metadata_writer->GetBlockPointer().block_id; |
73 | |
74 | vector<reference<SchemaCatalogEntry>> schemas; |
75 | // we scan the set of committed schemas |
76 | auto &catalog = Catalog::GetCatalog(db).Cast<DuckCatalog>(); |
77 | catalog.ScanSchemas(callback: [&](SchemaCatalogEntry &entry) { schemas.push_back(x: entry); }); |
78 | // write the actual data into the database |
79 | // write the amount of schemas |
80 | metadata_writer->Write<uint32_t>(element: schemas.size()); |
81 | for (auto &schema : schemas) { |
82 | WriteSchema(schema&: schema.get()); |
83 | } |
84 | partial_block_manager.FlushPartialBlocks(); |
85 | // flush the meta data to disk |
86 | metadata_writer->Flush(); |
87 | table_metadata_writer->Flush(); |
88 | |
89 | // write a checkpoint flag to the WAL |
90 | // this protects against the rare event that the database crashes AFTER writing the file, but BEFORE truncating the |
91 | // WAL we write an entry CHECKPOINT "meta_block_id" into the WAL upon loading, if we see there is an entry |
92 | // CHECKPOINT "meta_block_id", and the id MATCHES the head idin the file we know that the database was successfully |
93 | // checkpointed, so we know that we should avoid replaying the WAL to avoid duplicating data |
94 | auto wal = storage_manager.GetWriteAheadLog(); |
95 | wal->WriteCheckpoint(meta_block); |
96 | wal->Flush(); |
97 | |
98 | if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_HEADER) { |
99 | throw FatalException("Checkpoint aborted before header write because of PRAGMA checkpoint_abort flag" ); |
100 | } |
101 | |
102 | // finally write the updated header |
103 | DatabaseHeader ; |
104 | header.meta_block = meta_block; |
105 | block_manager.WriteHeader(header); |
106 | |
107 | if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_TRUNCATE) { |
108 | throw FatalException("Checkpoint aborted before truncate because of PRAGMA checkpoint_abort flag" ); |
109 | } |
110 | |
111 | // truncate the WAL |
112 | wal->Truncate(size: 0); |
113 | |
114 | // mark all blocks written as part of the metadata as modified |
115 | metadata_writer->MarkWrittenBlocks(); |
116 | table_metadata_writer->MarkWrittenBlocks(); |
117 | } |
118 | |
119 | void SingleFileCheckpointReader::LoadFromStorage() { |
120 | auto &block_manager = *storage.block_manager; |
121 | block_id_t meta_block = block_manager.GetMetaBlock(); |
122 | if (meta_block < 0) { |
123 | // storage is empty |
124 | return; |
125 | } |
126 | |
127 | Connection con(storage.GetDatabase()); |
128 | con.BeginTransaction(); |
129 | // create the MetaBlockReader to read from the storage |
130 | MetaBlockReader reader(block_manager, meta_block); |
131 | reader.SetCatalog(catalog.GetAttached().GetCatalog()); |
132 | reader.SetContext(*con.context); |
133 | LoadCheckpoint(context&: *con.context, reader); |
134 | con.Commit(); |
135 | } |
136 | |
137 | void CheckpointReader::LoadCheckpoint(ClientContext &context, MetaBlockReader &reader) { |
138 | uint32_t schema_count = reader.Read<uint32_t>(); |
139 | for (uint32_t i = 0; i < schema_count; i++) { |
140 | ReadSchema(context, reader); |
141 | } |
142 | } |
143 | |
144 | //===--------------------------------------------------------------------===// |
145 | // Schema |
146 | //===--------------------------------------------------------------------===// |
147 | void CheckpointWriter::WriteSchema(SchemaCatalogEntry &schema) { |
148 | // write the schema data |
149 | schema.Serialize(serializer&: GetMetaBlockWriter()); |
150 | // then, we fetch the tables/views/sequences information |
151 | vector<reference<TableCatalogEntry>> tables; |
152 | vector<reference<ViewCatalogEntry>> views; |
153 | schema.Scan(type: CatalogType::TABLE_ENTRY, callback: [&](CatalogEntry &entry) { |
154 | if (entry.internal) { |
155 | return; |
156 | } |
157 | if (entry.type == CatalogType::TABLE_ENTRY) { |
158 | tables.push_back(x: entry.Cast<TableCatalogEntry>()); |
159 | } else if (entry.type == CatalogType::VIEW_ENTRY) { |
160 | views.push_back(x: entry.Cast<ViewCatalogEntry>()); |
161 | } else { |
162 | throw NotImplementedException("Catalog type for entries" ); |
163 | } |
164 | }); |
165 | vector<reference<SequenceCatalogEntry>> sequences; |
166 | schema.Scan(type: CatalogType::SEQUENCE_ENTRY, callback: [&](CatalogEntry &entry) { |
167 | if (entry.internal) { |
168 | return; |
169 | } |
170 | sequences.push_back(x: entry.Cast<SequenceCatalogEntry>()); |
171 | }); |
172 | |
173 | vector<reference<TypeCatalogEntry>> custom_types; |
174 | schema.Scan(type: CatalogType::TYPE_ENTRY, callback: [&](CatalogEntry &entry) { |
175 | if (entry.internal) { |
176 | return; |
177 | } |
178 | custom_types.push_back(x: entry.Cast<TypeCatalogEntry>()); |
179 | }); |
180 | |
181 | vector<reference<ScalarMacroCatalogEntry>> macros; |
182 | schema.Scan(type: CatalogType::SCALAR_FUNCTION_ENTRY, callback: [&](CatalogEntry &entry) { |
183 | if (entry.internal) { |
184 | return; |
185 | } |
186 | if (entry.type == CatalogType::MACRO_ENTRY) { |
187 | macros.push_back(x: entry.Cast<ScalarMacroCatalogEntry>()); |
188 | } |
189 | }); |
190 | |
191 | vector<reference<TableMacroCatalogEntry>> table_macros; |
192 | schema.Scan(type: CatalogType::TABLE_FUNCTION_ENTRY, callback: [&](CatalogEntry &entry) { |
193 | if (entry.internal) { |
194 | return; |
195 | } |
196 | if (entry.type == CatalogType::TABLE_MACRO_ENTRY) { |
197 | table_macros.push_back(x: entry.Cast<TableMacroCatalogEntry>()); |
198 | } |
199 | }); |
200 | |
201 | vector<reference<IndexCatalogEntry>> indexes; |
202 | schema.Scan(type: CatalogType::INDEX_ENTRY, callback: [&](CatalogEntry &entry) { |
203 | D_ASSERT(!entry.internal); |
204 | indexes.push_back(x: entry.Cast<IndexCatalogEntry>()); |
205 | }); |
206 | |
207 | FieldWriter writer(GetMetaBlockWriter()); |
208 | writer.WriteField<uint32_t>(element: custom_types.size()); |
209 | writer.WriteField<uint32_t>(element: sequences.size()); |
210 | writer.WriteField<uint32_t>(element: tables.size()); |
211 | writer.WriteField<uint32_t>(element: views.size()); |
212 | writer.WriteField<uint32_t>(element: macros.size()); |
213 | writer.WriteField<uint32_t>(element: table_macros.size()); |
214 | writer.WriteField<uint32_t>(element: indexes.size()); |
215 | writer.Finalize(); |
216 | |
217 | // write the custom_types |
218 | for (auto &custom_type : custom_types) { |
219 | WriteType(type&: custom_type); |
220 | } |
221 | |
222 | // write the sequences |
223 | for (auto &seq : sequences) { |
224 | WriteSequence(table&: seq); |
225 | } |
226 | // reorder tables because of foreign key constraint |
227 | ReorderTableEntries(tables); |
228 | // Write the tables |
229 | for (auto &table : tables) { |
230 | WriteTable(table); |
231 | } |
232 | // Write the views |
233 | for (auto &view : views) { |
234 | WriteView(table&: view); |
235 | } |
236 | |
237 | // Write the macros |
238 | for (auto ¯o : macros) { |
239 | WriteMacro(table&: macro); |
240 | } |
241 | |
242 | // Write the table's macros |
243 | for (auto ¯o : table_macros) { |
244 | WriteTableMacro(table&: macro); |
245 | } |
246 | // Write the indexes |
247 | for (auto &index : indexes) { |
248 | WriteIndex(index_catalog&: index); |
249 | } |
250 | } |
251 | |
252 | void CheckpointReader::ReadSchema(ClientContext &context, MetaBlockReader &reader) { |
253 | // read the schema and create it in the catalog |
254 | auto info = SchemaCatalogEntry::Deserialize(source&: reader); |
255 | // we set create conflict to ignore to ignore the failure of recreating the main schema |
256 | info->on_conflict = OnCreateConflict::IGNORE_ON_CONFLICT; |
257 | catalog.CreateSchema(context, info&: *info); |
258 | |
259 | // first read all the counts |
260 | FieldReader field_reader(reader); |
261 | uint32_t enum_count = field_reader.ReadRequired<uint32_t>(); |
262 | uint32_t seq_count = field_reader.ReadRequired<uint32_t>(); |
263 | uint32_t table_count = field_reader.ReadRequired<uint32_t>(); |
264 | uint32_t view_count = field_reader.ReadRequired<uint32_t>(); |
265 | uint32_t macro_count = field_reader.ReadRequired<uint32_t>(); |
266 | uint32_t table_macro_count = field_reader.ReadRequired<uint32_t>(); |
267 | uint32_t table_index_count = field_reader.ReadRequired<uint32_t>(); |
268 | field_reader.Finalize(); |
269 | |
270 | // now read the enums |
271 | for (uint32_t i = 0; i < enum_count; i++) { |
272 | ReadType(context, reader); |
273 | } |
274 | |
275 | // read the sequences |
276 | for (uint32_t i = 0; i < seq_count; i++) { |
277 | ReadSequence(context, reader); |
278 | } |
279 | // read the table count and recreate the tables |
280 | for (uint32_t i = 0; i < table_count; i++) { |
281 | ReadTable(context, reader); |
282 | } |
283 | // now read the views |
284 | for (uint32_t i = 0; i < view_count; i++) { |
285 | ReadView(context, reader); |
286 | } |
287 | |
288 | // finally read the macro's |
289 | for (uint32_t i = 0; i < macro_count; i++) { |
290 | ReadMacro(context, reader); |
291 | } |
292 | |
293 | for (uint32_t i = 0; i < table_macro_count; i++) { |
294 | ReadTableMacro(context, reader); |
295 | } |
296 | for (uint32_t i = 0; i < table_index_count; i++) { |
297 | ReadIndex(context, reader); |
298 | } |
299 | } |
300 | |
301 | //===--------------------------------------------------------------------===// |
302 | // Views |
303 | //===--------------------------------------------------------------------===// |
304 | void CheckpointWriter::WriteView(ViewCatalogEntry &view) { |
305 | view.Serialize(serializer&: GetMetaBlockWriter()); |
306 | } |
307 | |
308 | void CheckpointReader::ReadView(ClientContext &context, MetaBlockReader &reader) { |
309 | auto info = ViewCatalogEntry::Deserialize(source&: reader, context); |
310 | catalog.CreateView(context, info&: *info); |
311 | } |
312 | |
313 | //===--------------------------------------------------------------------===// |
314 | // Sequences |
315 | //===--------------------------------------------------------------------===// |
316 | void CheckpointWriter::WriteSequence(SequenceCatalogEntry &seq) { |
317 | seq.Serialize(serializer&: GetMetaBlockWriter()); |
318 | } |
319 | |
320 | void CheckpointReader::ReadSequence(ClientContext &context, MetaBlockReader &reader) { |
321 | auto info = SequenceCatalogEntry::Deserialize(source&: reader); |
322 | catalog.CreateSequence(context, info&: *info); |
323 | } |
324 | |
325 | //===--------------------------------------------------------------------===// |
326 | // Indexes |
327 | //===--------------------------------------------------------------------===// |
328 | void CheckpointWriter::WriteIndex(IndexCatalogEntry &index_catalog) { |
329 | // The index data should already have been written as part of WriteTableData. |
330 | // Here, we need only serialize the pointer to that data. |
331 | auto root_offset = index_catalog.index->GetSerializedDataPointer(); |
332 | auto &metadata_writer = GetMetaBlockWriter(); |
333 | index_catalog.Serialize(serializer&: metadata_writer); |
334 | // Serialize the Block id and offset of root node |
335 | metadata_writer.Write(element: root_offset.block_id); |
336 | metadata_writer.Write(element: root_offset.offset); |
337 | } |
338 | |
339 | void CheckpointReader::ReadIndex(ClientContext &context, MetaBlockReader &reader) { |
340 | // deserialize the index metadata |
341 | auto info = IndexCatalogEntry::Deserialize(source&: reader, context); |
342 | |
343 | // create the index in the catalog |
344 | auto &schema_catalog = catalog.GetSchema(context, name: info->schema); |
345 | auto &table_catalog = catalog.GetEntry(context, type: CatalogType::TABLE_ENTRY, schema: info->schema, name: info->table->table_name) |
346 | .Cast<DuckTableEntry>(); |
347 | auto &index_catalog = schema_catalog.CreateIndex(context, info&: *info, table&: table_catalog)->Cast<DuckIndexEntry>(); |
348 | index_catalog.info = table_catalog.GetStorage().info; |
349 | |
350 | // we deserialize the index lazily, i.e., we do not need to load any node information |
351 | // except the root block id and offset |
352 | auto root_block_id = reader.Read<block_id_t>(); |
353 | auto root_offset = reader.Read<uint32_t>(); |
354 | |
355 | // obtain the expressions of the ART from the index metadata |
356 | vector<unique_ptr<Expression>> unbound_expressions; |
357 | vector<unique_ptr<ParsedExpression>> parsed_expressions; |
358 | for (auto &p_exp : info->parsed_expressions) { |
359 | parsed_expressions.push_back(x: p_exp->Copy()); |
360 | } |
361 | |
362 | // bind the parsed expressions |
363 | // add the table to the bind context |
364 | auto binder = Binder::CreateBinder(context); |
365 | vector<LogicalType> column_types; |
366 | vector<string> column_names; |
367 | for (auto &col : table_catalog.GetColumns().Logical()) { |
368 | column_types.push_back(x: col.Type()); |
369 | column_names.push_back(x: col.Name()); |
370 | } |
371 | vector<column_t> column_ids; |
372 | binder->bind_context.AddBaseTable(index: 0, alias: info->table->table_name, names: column_names, types: column_types, bound_column_ids&: column_ids, |
373 | entry: &table_catalog); |
374 | IndexBinder idx_binder(*binder, context); |
375 | unbound_expressions.reserve(n: parsed_expressions.size()); |
376 | for (auto &expr : parsed_expressions) { |
377 | unbound_expressions.push_back(x: idx_binder.Bind(expr)); |
378 | } |
379 | |
380 | if (parsed_expressions.empty()) { |
381 | // this is a PK/FK index: we create the necessary bound column ref expressions |
382 | unbound_expressions.reserve(n: info->column_ids.size()); |
383 | for (idx_t key_nr = 0; key_nr < info->column_ids.size(); key_nr++) { |
384 | auto &col = table_catalog.GetColumn(idx: LogicalIndex(info->column_ids[key_nr])); |
385 | unbound_expressions.push_back( |
386 | x: make_uniq<BoundColumnRefExpression>(args: col.GetName(), args: col.GetType(), args: ColumnBinding(0, key_nr))); |
387 | } |
388 | } |
389 | |
390 | // create the index and add it to the storage |
391 | switch (info->index_type) { |
392 | case IndexType::ART: { |
393 | auto &storage = table_catalog.GetStorage(); |
394 | auto art = make_uniq<ART>(args&: info->column_ids, args&: TableIOManager::Get(table&: storage), args: std::move(unbound_expressions), |
395 | args&: info->constraint_type, args&: storage.db, args&: root_block_id, args&: root_offset); |
396 | index_catalog.index = art.get(); |
397 | storage.info->indexes.AddIndex(index: std::move(art)); |
398 | break; |
399 | } |
400 | default: |
401 | throw InternalException("Unknown index type for ReadIndex" ); |
402 | } |
403 | } |
404 | |
405 | //===--------------------------------------------------------------------===// |
406 | // Custom Types |
407 | //===--------------------------------------------------------------------===// |
408 | void CheckpointWriter::WriteType(TypeCatalogEntry &type) { |
409 | type.Serialize(serializer&: GetMetaBlockWriter()); |
410 | } |
411 | |
412 | void CheckpointReader::ReadType(ClientContext &context, MetaBlockReader &reader) { |
413 | auto info = TypeCatalogEntry::Deserialize(source&: reader); |
414 | auto &catalog_entry = catalog.CreateType(context, info&: *info)->Cast<TypeCatalogEntry>(); |
415 | if (info->type.id() == LogicalTypeId::ENUM) { |
416 | EnumType::SetCatalog(type&: info->type, catalog_entry: &catalog_entry); |
417 | } |
418 | } |
419 | |
420 | //===--------------------------------------------------------------------===// |
421 | // Macro's |
422 | //===--------------------------------------------------------------------===// |
423 | void CheckpointWriter::WriteMacro(ScalarMacroCatalogEntry ¯o) { |
424 | macro.Serialize(serializer&: GetMetaBlockWriter()); |
425 | } |
426 | |
427 | void CheckpointReader::ReadMacro(ClientContext &context, MetaBlockReader &reader) { |
428 | auto info = MacroCatalogEntry::Deserialize(main_source&: reader, context); |
429 | catalog.CreateFunction(context, info&: *info); |
430 | } |
431 | |
432 | void CheckpointWriter::WriteTableMacro(TableMacroCatalogEntry ¯o) { |
433 | macro.Serialize(serializer&: GetMetaBlockWriter()); |
434 | } |
435 | |
436 | void CheckpointReader::ReadTableMacro(ClientContext &context, MetaBlockReader &reader) { |
437 | auto info = MacroCatalogEntry::Deserialize(main_source&: reader, context); |
438 | catalog.CreateFunction(context, info&: *info); |
439 | } |
440 | |
441 | //===--------------------------------------------------------------------===// |
442 | // Table Metadata |
443 | //===--------------------------------------------------------------------===// |
444 | void CheckpointWriter::WriteTable(TableCatalogEntry &table) { |
445 | // write the table meta data |
446 | table.Serialize(serializer&: GetMetaBlockWriter()); |
447 | // now we need to write the table data. |
448 | if (auto writer = GetTableDataWriter(table)) { |
449 | writer->WriteTableData(); |
450 | } |
451 | } |
452 | |
453 | void CheckpointReader::ReadTable(ClientContext &context, MetaBlockReader &reader) { |
454 | // deserialize the table meta data |
455 | auto info = TableCatalogEntry::Deserialize(source&: reader, context); |
456 | // bind the info |
457 | auto binder = Binder::CreateBinder(context); |
458 | auto &schema = catalog.GetSchema(context, name: info->schema); |
459 | auto bound_info = binder->BindCreateTableInfo(info: std::move(info), schema); |
460 | |
461 | // now read the actual table data and place it into the create table info |
462 | ReadTableData(context, reader, bound_info&: *bound_info); |
463 | |
464 | // finally create the table in the catalog |
465 | catalog.CreateTable(context, info&: *bound_info); |
466 | } |
467 | |
468 | void CheckpointReader::ReadTableData(ClientContext &context, MetaBlockReader &reader, |
469 | BoundCreateTableInfo &bound_info) { |
470 | auto block_id = reader.Read<block_id_t>(); |
471 | auto offset = reader.Read<uint64_t>(); |
472 | |
473 | MetaBlockReader table_data_reader(reader.block_manager, block_id); |
474 | table_data_reader.offset = offset; |
475 | TableDataReader data_reader(table_data_reader, bound_info); |
476 | |
477 | data_reader.ReadTableData(); |
478 | bound_info.data->total_rows = reader.Read<idx_t>(); |
479 | |
480 | // Get any indexes block info |
481 | idx_t num_indexes = reader.Read<idx_t>(); |
482 | for (idx_t i = 0; i < num_indexes; i++) { |
483 | auto idx_block_id = reader.Read<idx_t>(); |
484 | auto idx_offset = reader.Read<idx_t>(); |
485 | bound_info.indexes.emplace_back(args&: idx_block_id, args&: idx_offset); |
486 | } |
487 | } |
488 | |
489 | } // namespace duckdb |
490 | |