1#include "duckdb/storage/write_ahead_log.hpp"
2#include "duckdb/storage/data_table.hpp"
3#include "duckdb/common/serializer/buffered_file_reader.hpp"
4#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
5#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
6#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp"
7#include "duckdb/main/client_context.hpp"
8#include "duckdb/main/database.hpp"
9#include "duckdb/parser/parsed_data/alter_table_info.hpp"
10#include "duckdb/parser/parsed_data/drop_info.hpp"
11#include "duckdb/parser/parsed_data/create_schema_info.hpp"
12#include "duckdb/parser/parsed_data/create_table_info.hpp"
13#include "duckdb/parser/parsed_data/create_view_info.hpp"
14#include "duckdb/planner/binder.hpp"
15#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
16
17using namespace duckdb;
18using namespace std;
19
20class ReplayState {
21public:
22 ReplayState(DuckDB &db, ClientContext &context, Deserializer &source)
23 : db(db), context(context), source(source), current_table(nullptr) {
24 }
25
26 DuckDB &db;
27 ClientContext &context;
28 Deserializer &source;
29 TableCatalogEntry *current_table;
30
31public:
32 void ReplayEntry(WALType entry_type);
33
34private:
35 void ReplayCreateTable();
36 void ReplayDropTable();
37 void ReplayAlter();
38
39 void ReplayCreateView();
40 void ReplayDropView();
41
42 void ReplayCreateSchema();
43 void ReplayDropSchema();
44
45 void ReplayCreateSequence();
46 void ReplayDropSequence();
47 void ReplaySequenceValue();
48
49 void ReplayUseTable();
50 void ReplayInsert();
51 void ReplayDelete();
52 void ReplayUpdate();
53};
54
55void WriteAheadLog::Replay(DuckDB &database, string &path) {
56 BufferedFileReader reader(*database.file_system, path.c_str());
57
58 if (reader.Finished()) {
59 // WAL is empty
60 return;
61 }
62
63 ClientContext context(database);
64 context.transaction.SetAutoCommit(false);
65 context.transaction.BeginTransaction();
66
67 ReplayState state(database, context, reader);
68
69 // replay the WAL
70 // note that everything is wrapped inside a try/catch block here
71 // there can be errors in WAL replay because of a corrupt WAL file
72 // in this case we should throw a warning but startup anyway
73 try {
74 while (true) {
75 // read the current entry
76 WALType entry_type = reader.Read<WALType>();
77 if (entry_type == WALType::WAL_FLUSH) {
78 // flush: commit the current transaction
79 context.transaction.Commit();
80 context.transaction.SetAutoCommit(false);
81 // check if the file is exhausted
82 if (reader.Finished()) {
83 // we finished reading the file: break
84 break;
85 }
86 // otherwise we keep on reading
87 context.transaction.BeginTransaction();
88 } else {
89 // replay the entry
90 state.ReplayEntry(entry_type);
91 }
92 }
93 } catch (std::exception &ex) {
94 // FIXME: this report a proper warning in the connection
95 fprintf(stderr, "Exception in WAL playback: %s\n", ex.what());
96 // exception thrown in WAL replay: rollback
97 context.transaction.Rollback();
98 }
99}
100
101//===--------------------------------------------------------------------===//
102// Replay Entries
103//===--------------------------------------------------------------------===//
104void ReplayState::ReplayEntry(WALType entry_type) {
105 switch (entry_type) {
106 case WALType::CREATE_TABLE:
107 ReplayCreateTable();
108 break;
109 case WALType::DROP_TABLE:
110 ReplayDropTable();
111 break;
112 case WALType::ALTER_INFO:
113 ReplayAlter();
114 break;
115 case WALType::CREATE_VIEW:
116 ReplayCreateView();
117 break;
118 case WALType::DROP_VIEW:
119 ReplayDropView();
120 break;
121 case WALType::CREATE_SCHEMA:
122 ReplayCreateSchema();
123 break;
124 case WALType::DROP_SCHEMA:
125 ReplayDropSchema();
126 break;
127 case WALType::CREATE_SEQUENCE:
128 ReplayCreateSequence();
129 break;
130 case WALType::DROP_SEQUENCE:
131 ReplayDropSequence();
132 break;
133 case WALType::SEQUENCE_VALUE:
134 ReplaySequenceValue();
135 break;
136 case WALType::USE_TABLE:
137 ReplayUseTable();
138 break;
139 case WALType::INSERT_TUPLE:
140 ReplayInsert();
141 break;
142 case WALType::DELETE_TUPLE:
143 ReplayDelete();
144 break;
145 case WALType::UPDATE_TUPLE:
146 ReplayUpdate();
147 break;
148 default:
149 throw Exception("Invalid WAL entry type!");
150 }
151}
152
153//===--------------------------------------------------------------------===//
154// Replay Table
155//===--------------------------------------------------------------------===//
156void ReplayState::ReplayCreateTable() {
157 auto info = TableCatalogEntry::Deserialize(source);
158
159 // bind the constraints to the table again
160 Binder binder(context);
161 auto bound_info = binder.BindCreateTableInfo(move(info));
162
163 db.catalog->CreateTable(context, bound_info.get());
164}
165
166void ReplayState::ReplayDropTable() {
167 DropInfo info;
168
169 info.type = CatalogType::TABLE;
170 info.schema = source.Read<string>();
171 info.name = source.Read<string>();
172
173 db.catalog->DropEntry(context, &info);
174}
175
176void ReplayState::ReplayAlter() {
177 auto info = AlterInfo::Deserialize(source);
178 if (info->type != AlterType::ALTER_TABLE) {
179 throw Exception("Expected ALTER TABLE!");
180 }
181
182 db.catalog->AlterTable(context, (AlterTableInfo *)info.get());
183}
184
185//===--------------------------------------------------------------------===//
186// Replay View
187//===--------------------------------------------------------------------===//
188void ReplayState::ReplayCreateView() {
189 auto entry = ViewCatalogEntry::Deserialize(source);
190
191 db.catalog->CreateView(context, entry.get());
192}
193
194void ReplayState::ReplayDropView() {
195 DropInfo info;
196 info.type = CatalogType::VIEW;
197 info.schema = source.Read<string>();
198 info.name = source.Read<string>();
199 db.catalog->DropEntry(context, &info);
200}
201
202//===--------------------------------------------------------------------===//
203// Replay Schema
204//===--------------------------------------------------------------------===//
205void ReplayState::ReplayCreateSchema() {
206 CreateSchemaInfo info;
207 info.schema = source.Read<string>();
208
209 db.catalog->CreateSchema(context, &info);
210}
211
212void ReplayState::ReplayDropSchema() {
213 DropInfo info;
214
215 info.type = CatalogType::SCHEMA;
216 info.name = source.Read<string>();
217
218 db.catalog->DropEntry(context, &info);
219}
220
221//===--------------------------------------------------------------------===//
222// Replay Sequence
223//===--------------------------------------------------------------------===//
224void ReplayState::ReplayCreateSequence() {
225 auto entry = SequenceCatalogEntry::Deserialize(source);
226
227 db.catalog->CreateSequence(context, entry.get());
228}
229
230void ReplayState::ReplayDropSequence() {
231 DropInfo info;
232 info.type = CatalogType::SEQUENCE;
233 info.schema = source.Read<string>();
234 info.name = source.Read<string>();
235
236 db.catalog->DropEntry(context, &info);
237}
238
239void ReplayState::ReplaySequenceValue() {
240 auto schema = source.Read<string>();
241 auto name = source.Read<string>();
242 auto usage_count = source.Read<uint64_t>();
243 auto counter = source.Read<int64_t>();
244
245 // fetch the sequence from the catalog
246 auto seq = db.catalog->GetEntry<SequenceCatalogEntry>(context, schema, name);
247 if (usage_count > seq->usage_count) {
248 seq->usage_count = usage_count;
249 seq->counter = counter;
250 }
251}
252
253//===--------------------------------------------------------------------===//
254// Replay Data
255//===--------------------------------------------------------------------===//
256void ReplayState::ReplayUseTable() {
257 auto schema_name = source.Read<string>();
258 auto table_name = source.Read<string>();
259 current_table = db.catalog->GetEntry<TableCatalogEntry>(context, schema_name, table_name);
260}
261
262void ReplayState::ReplayInsert() {
263 if (!current_table) {
264 throw Exception("Corrupt WAL: insert without table");
265 }
266 DataChunk chunk;
267 chunk.Deserialize(source);
268
269 // append to the current table
270 current_table->storage->Append(*current_table, context, chunk);
271}
272
273void ReplayState::ReplayDelete() {
274 if (!current_table) {
275 throw Exception("Corrupt WAL: delete without table");
276 }
277 DataChunk chunk;
278 chunk.Deserialize(source);
279
280 assert(chunk.column_count() == 1 && chunk.data[0].type == ROW_TYPE);
281 row_t row_ids[1];
282 Vector row_identifiers(ROW_TYPE, (data_ptr_t)row_ids);
283
284 auto source_ids = FlatVector::GetData<row_t>(chunk.data[0]);
285 // delete the tuples from the current table
286 for (idx_t i = 0; i < chunk.size(); i++) {
287 row_ids[0] = source_ids[i];
288 current_table->storage->Delete(*current_table, context, row_identifiers, 1);
289 }
290}
291
292void ReplayState::ReplayUpdate() {
293 if (!current_table) {
294 throw Exception("Corrupt WAL: update without table");
295 }
296
297 idx_t column_index = source.Read<column_t>();
298
299 DataChunk chunk;
300 chunk.Deserialize(source);
301
302 vector<column_t> column_ids{column_index};
303 if (column_index >= current_table->columns.size()) {
304 throw Exception("Corrupt WAL: column index for update out of bounds");
305 }
306
307 // remove the row id vector from the chunk
308 auto row_ids = move(chunk.data.back());
309 chunk.data.pop_back();
310
311 // now perform the update
312 current_table->storage->Update(*current_table, context, row_ids, column_ids, chunk);
313}
314