1#include "duckdb/storage/checkpoint_manager.hpp"
2#include "duckdb/storage/block_manager.hpp"
3#include "duckdb/storage/meta_block_reader.hpp"
4
5#include "duckdb/common/serializer.hpp"
6#include "duckdb/common/vector_operations/vector_operations.hpp"
7#include "duckdb/common/types/null_value.hpp"
8
9#include "duckdb/catalog/catalog.hpp"
10#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
11#include "duckdb/catalog/catalog_entry/sequence_catalog_entry.hpp"
12#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
13#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp"
14
15#include "duckdb/parser/parsed_data/create_schema_info.hpp"
16#include "duckdb/parser/parsed_data/create_table_info.hpp"
17#include "duckdb/parser/parsed_data/create_view_info.hpp"
18
19#include "duckdb/planner/binder.hpp"
20#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
21
22#include "duckdb/main/client_context.hpp"
23#include "duckdb/main/database.hpp"
24
25#include "duckdb/transaction/transaction_manager.hpp"
26
27#include "duckdb/storage/checkpoint/table_data_writer.hpp"
28#include "duckdb/storage/checkpoint/table_data_reader.hpp"
29
30using namespace duckdb;
31using namespace std;
32
33// constexpr uint64_t CheckpointManager::DATA_BLOCK_HEADER_SIZE;
34
35CheckpointManager::CheckpointManager(StorageManager &manager)
36 : block_manager(*manager.block_manager), buffer_manager(*manager.buffer_manager), database(manager.database) {
37}
38
39void CheckpointManager::CreateCheckpoint() {
40 // assert that the checkpoint manager hasn't been used before
41 assert(!metadata_writer);
42
43 auto transaction = database.transaction_manager->StartTransaction();
44 block_manager.StartCheckpoint();
45
46 //! Set up the writers for the checkpoints
47 metadata_writer = make_unique<MetaBlockWriter>(block_manager);
48 tabledata_writer = make_unique<MetaBlockWriter>(block_manager);
49
50 // get the id of the first meta block
51 block_id_t meta_block = metadata_writer->block->id;
52
53 vector<SchemaCatalogEntry *> schemas;
54 // we scan the schemas
55 database.catalog->schemas.Scan(*transaction,
56 [&](CatalogEntry *entry) { schemas.push_back((SchemaCatalogEntry *)entry); });
57 // write the actual data into the database
58 // write the amount of schemas
59 metadata_writer->Write<uint32_t>(schemas.size());
60 for (auto &schema : schemas) {
61 WriteSchema(*transaction, *schema);
62 }
63 // flush the meta data to disk
64 metadata_writer->Flush();
65 tabledata_writer->Flush();
66
67 // finally write the updated header
68 DatabaseHeader header;
69 header.meta_block = meta_block;
70 block_manager.WriteHeader(header);
71}
72
73void CheckpointManager::LoadFromStorage() {
74 block_id_t meta_block = block_manager.GetMetaBlock();
75 if (meta_block < 0) {
76 // storage is empty
77 return;
78 }
79
80 ClientContext context(database);
81 context.transaction.BeginTransaction();
82 // create the MetaBlockReader to read from the storage
83 MetaBlockReader reader(buffer_manager, meta_block);
84 uint32_t schema_count = reader.Read<uint32_t>();
85 for (uint32_t i = 0; i < schema_count; i++) {
86 ReadSchema(context, reader);
87 }
88 context.transaction.Commit();
89}
90
91//===--------------------------------------------------------------------===//
92// Schema
93//===--------------------------------------------------------------------===//
94void CheckpointManager::WriteSchema(Transaction &transaction, SchemaCatalogEntry &schema) {
95 // write the schema data
96 schema.Serialize(*metadata_writer);
97 // then, we fetch the tables/views/sequences information
98 vector<TableCatalogEntry *> tables;
99 vector<ViewCatalogEntry *> views;
100 schema.tables.Scan(transaction, [&](CatalogEntry *entry) {
101 if (entry->type == CatalogType::TABLE) {
102 tables.push_back((TableCatalogEntry *)entry);
103 } else if (entry->type == CatalogType::VIEW) {
104 views.push_back((ViewCatalogEntry *)entry);
105 } else {
106 throw NotImplementedException("Catalog type for entries");
107 }
108 });
109 vector<SequenceCatalogEntry *> sequences;
110 schema.sequences.Scan(transaction,
111 [&](CatalogEntry *entry) { sequences.push_back((SequenceCatalogEntry *)entry); });
112
113 // write the sequences
114 metadata_writer->Write<uint32_t>(sequences.size());
115 for (auto &seq : sequences) {
116 WriteSequence(*seq);
117 }
118 // now write the tables
119 metadata_writer->Write<uint32_t>(tables.size());
120 for (auto &table : tables) {
121 WriteTable(transaction, *table);
122 }
123 // finally write the views
124 metadata_writer->Write<uint32_t>(views.size());
125 for (auto &view : views) {
126 WriteView(*view);
127 }
128}
129
130void CheckpointManager::ReadSchema(ClientContext &context, MetaBlockReader &reader) {
131 // read the schema and create it in the catalog
132 auto info = SchemaCatalogEntry::Deserialize(reader);
133 // we set create conflict to ignore to ignore the failure of recreating the main schema
134 info->on_conflict = OnCreateConflict::IGNORE;
135 database.catalog->CreateSchema(context, info.get());
136
137 // read the sequences
138 uint32_t seq_count = reader.Read<uint32_t>();
139 for (uint32_t i = 0; i < seq_count; i++) {
140 ReadSequence(context, reader);
141 }
142 // read the table count and recreate the tables
143 uint32_t table_count = reader.Read<uint32_t>();
144 for (uint32_t i = 0; i < table_count; i++) {
145 ReadTable(context, reader);
146 }
147 // finally read the views
148 uint32_t view_count = reader.Read<uint32_t>();
149 for (uint32_t i = 0; i < view_count; i++) {
150 ReadView(context, reader);
151 }
152}
153
154//===--------------------------------------------------------------------===//
155// Views
156//===--------------------------------------------------------------------===//
157void CheckpointManager::WriteView(ViewCatalogEntry &view) {
158 view.Serialize(*metadata_writer);
159}
160
161void CheckpointManager::ReadView(ClientContext &context, MetaBlockReader &reader) {
162 auto info = ViewCatalogEntry::Deserialize(reader);
163
164 database.catalog->CreateView(context, info.get());
165}
166
167//===--------------------------------------------------------------------===//
168// Sequences
169//===--------------------------------------------------------------------===//
170void CheckpointManager::WriteSequence(SequenceCatalogEntry &seq) {
171 seq.Serialize(*metadata_writer);
172}
173
174void CheckpointManager::ReadSequence(ClientContext &context, MetaBlockReader &reader) {
175 auto info = SequenceCatalogEntry::Deserialize(reader);
176
177 database.catalog->CreateSequence(context, info.get());
178}
179
180//===--------------------------------------------------------------------===//
181// Table Metadata
182//===--------------------------------------------------------------------===//
183void CheckpointManager::WriteTable(Transaction &transaction, TableCatalogEntry &table) {
184 // write the table meta data
185 table.Serialize(*metadata_writer);
186 //! write the blockId for the table info
187 metadata_writer->Write<block_id_t>(tabledata_writer->block->id);
188 //! and the offset to where the info starts
189 metadata_writer->Write<uint64_t>(tabledata_writer->offset);
190 // now we need to write the table data
191 TableDataWriter writer(*this, table);
192 writer.WriteTableData(transaction);
193}
194
195void CheckpointManager::ReadTable(ClientContext &context, MetaBlockReader &reader) {
196 // deserialize the table meta data
197 auto info = TableCatalogEntry::Deserialize(reader);
198 // bind the info
199 Binder binder(context);
200 auto bound_info = binder.BindCreateTableInfo(move(info));
201
202 // now read the actual table data and place it into the create table info
203 auto block_id = reader.Read<block_id_t>();
204 auto offset = reader.Read<uint64_t>();
205 MetaBlockReader table_data_reader(buffer_manager, block_id);
206 table_data_reader.offset = offset;
207 TableDataReader data_reader(*this, table_data_reader, *bound_info);
208 data_reader.ReadTableData();
209
210 // finally create the table in the catalog
211 database.catalog->CreateTable(context, bound_info.get());
212}
213