1#include "duckdb/transaction/commit_state.hpp"
2
3#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
4#include "duckdb/catalog/catalog_entry/type_catalog_entry.hpp"
5#include "duckdb/catalog/catalog_set.hpp"
6#include "duckdb/catalog/duck_catalog.hpp"
7#include "duckdb/common/serializer/buffered_deserializer.hpp"
8#include "duckdb/storage/data_table.hpp"
9#include "duckdb/storage/table/chunk_info.hpp"
10#include "duckdb/storage/table/column_data.hpp"
11#include "duckdb/storage/table/row_group.hpp"
12#include "duckdb/storage/table/update_segment.hpp"
13#include "duckdb/storage/write_ahead_log.hpp"
14#include "duckdb/transaction/append_info.hpp"
15#include "duckdb/transaction/delete_info.hpp"
16#include "duckdb/transaction/update_info.hpp"
17#include "duckdb/catalog/catalog_entry/scalar_macro_catalog_entry.hpp"
18#include "duckdb/catalog/catalog_entry/view_catalog_entry.hpp"
19
20namespace duckdb {
21
22CommitState::CommitState(ClientContext &context, transaction_t commit_id, optional_ptr<WriteAheadLog> log)
23 : log(log), commit_id(commit_id), current_table_info(nullptr), context(context) {
24}
25
26void CommitState::SwitchTable(DataTableInfo *table_info, UndoFlags new_op) {
27 if (current_table_info != table_info) {
28 // write the current table to the log
29 log->WriteSetTable(schema&: table_info->schema, table&: table_info->table);
30 current_table_info = table_info;
31 }
32}
33
34void CommitState::WriteCatalogEntry(CatalogEntry &entry, data_ptr_t dataptr) {
35 if (entry.temporary || entry.parent->temporary) {
36 return;
37 }
38 D_ASSERT(log);
39 // look at the type of the parent entry
40 auto parent = entry.parent;
41 switch (parent->type) {
42 case CatalogType::TABLE_ENTRY:
43 if (entry.type == CatalogType::TABLE_ENTRY) {
44 auto &table_entry = entry.Cast<DuckTableEntry>();
45 D_ASSERT(table_entry.IsDuckTable());
46 // ALTER TABLE statement, read the extra data after the entry
47 auto extra_data_size = Load<idx_t>(ptr: dataptr);
48 auto extra_data = data_ptr_cast(src: dataptr + sizeof(idx_t));
49
50 BufferedDeserializer source(extra_data, extra_data_size);
51 string column_name = source.Read<string>();
52
53 if (!column_name.empty()) {
54 // write the alter table in the log
55 table_entry.CommitAlter(column_name);
56 }
57
58 log->WriteAlter(ptr: source.ptr, data_size: source.endptr - source.ptr);
59 } else {
60 // CREATE TABLE statement
61 log->WriteCreateTable(entry: parent->Cast<TableCatalogEntry>());
62 }
63 break;
64 case CatalogType::SCHEMA_ENTRY:
65 if (entry.type == CatalogType::SCHEMA_ENTRY) {
66 // ALTER TABLE statement, skip it
67 return;
68 }
69 log->WriteCreateSchema(entry: parent->Cast<SchemaCatalogEntry>());
70 break;
71 case CatalogType::VIEW_ENTRY:
72 if (entry.type == CatalogType::VIEW_ENTRY) {
73 // ALTER TABLE statement, read the extra data after the entry
74 auto extra_data_size = Load<idx_t>(ptr: dataptr);
75 auto extra_data = data_ptr_cast(src: dataptr + sizeof(idx_t));
76 // deserialize it
77 BufferedDeserializer source(extra_data, extra_data_size);
78 string column_name = source.Read<string>();
79 // write the alter table in the log
80 log->WriteAlter(ptr: source.ptr, data_size: source.endptr - source.ptr);
81 } else {
82 log->WriteCreateView(entry: parent->Cast<ViewCatalogEntry>());
83 }
84 break;
85 case CatalogType::SEQUENCE_ENTRY:
86 log->WriteCreateSequence(entry: parent->Cast<SequenceCatalogEntry>());
87 break;
88 case CatalogType::MACRO_ENTRY:
89 log->WriteCreateMacro(entry: parent->Cast<ScalarMacroCatalogEntry>());
90 break;
91 case CatalogType::TABLE_MACRO_ENTRY:
92 log->WriteCreateTableMacro(entry: parent->Cast<TableMacroCatalogEntry>());
93 break;
94 case CatalogType::INDEX_ENTRY:
95 log->WriteCreateIndex(entry: parent->Cast<IndexCatalogEntry>());
96 break;
97 case CatalogType::TYPE_ENTRY:
98 log->WriteCreateType(entry: parent->Cast<TypeCatalogEntry>());
99 break;
100 case CatalogType::DELETED_ENTRY:
101 switch (entry.type) {
102 case CatalogType::TABLE_ENTRY: {
103 auto &table_entry = entry.Cast<DuckTableEntry>();
104 D_ASSERT(table_entry.IsDuckTable());
105 table_entry.CommitDrop();
106 log->WriteDropTable(entry: table_entry);
107 break;
108 }
109 case CatalogType::SCHEMA_ENTRY:
110 log->WriteDropSchema(entry: entry.Cast<SchemaCatalogEntry>());
111 break;
112 case CatalogType::VIEW_ENTRY:
113 log->WriteDropView(entry: entry.Cast<ViewCatalogEntry>());
114 break;
115 case CatalogType::SEQUENCE_ENTRY:
116 log->WriteDropSequence(entry: entry.Cast<SequenceCatalogEntry>());
117 break;
118 case CatalogType::MACRO_ENTRY:
119 log->WriteDropMacro(entry: entry.Cast<ScalarMacroCatalogEntry>());
120 break;
121 case CatalogType::TABLE_MACRO_ENTRY:
122 log->WriteDropTableMacro(entry: entry.Cast<TableMacroCatalogEntry>());
123 break;
124 case CatalogType::TYPE_ENTRY:
125 log->WriteDropType(entry: entry.Cast<TypeCatalogEntry>());
126 break;
127 case CatalogType::INDEX_ENTRY:
128 log->WriteDropIndex(entry: entry.Cast<IndexCatalogEntry>());
129 break;
130 case CatalogType::PREPARED_STATEMENT:
131 case CatalogType::SCALAR_FUNCTION_ENTRY:
132 // do nothing, indexes/prepared statements/functions aren't persisted to disk
133 break;
134 default:
135 throw InternalException("Don't know how to drop this type!");
136 }
137 break;
138 case CatalogType::PREPARED_STATEMENT:
139 case CatalogType::AGGREGATE_FUNCTION_ENTRY:
140 case CatalogType::SCALAR_FUNCTION_ENTRY:
141 case CatalogType::TABLE_FUNCTION_ENTRY:
142 case CatalogType::COPY_FUNCTION_ENTRY:
143 case CatalogType::PRAGMA_FUNCTION_ENTRY:
144 case CatalogType::COLLATION_ENTRY:
145 // do nothing, these entries are not persisted to disk
146 break;
147 default:
148 throw InternalException("UndoBuffer - don't know how to write this entry to the WAL");
149 }
150}
151
152void CommitState::WriteDelete(DeleteInfo &info) {
153 D_ASSERT(log);
154 // switch to the current table, if necessary
155 SwitchTable(table_info: info.table->info.get(), new_op: UndoFlags::DELETE_TUPLE);
156
157 if (!delete_chunk) {
158 delete_chunk = make_uniq<DataChunk>();
159 vector<LogicalType> delete_types = {LogicalType::ROW_TYPE};
160 delete_chunk->Initialize(allocator&: Allocator::DefaultAllocator(), types: delete_types);
161 }
162 auto rows = FlatVector::GetData<row_t>(vector&: delete_chunk->data[0]);
163 for (idx_t i = 0; i < info.count; i++) {
164 rows[i] = info.base_row + info.rows[i];
165 }
166 delete_chunk->SetCardinality(info.count);
167 log->WriteDelete(chunk&: *delete_chunk);
168}
169
170void CommitState::WriteUpdate(UpdateInfo &info) {
171 D_ASSERT(log);
172 // switch to the current table, if necessary
173 auto &column_data = info.segment->column_data;
174 auto &table_info = column_data.GetTableInfo();
175
176 SwitchTable(table_info: &table_info, new_op: UndoFlags::UPDATE_TUPLE);
177
178 // initialize the update chunk
179 vector<LogicalType> update_types;
180 if (column_data.type.id() == LogicalTypeId::VALIDITY) {
181 update_types.emplace_back(args: LogicalType::BOOLEAN);
182 } else {
183 update_types.push_back(x: column_data.type);
184 }
185 update_types.emplace_back(args: LogicalType::ROW_TYPE);
186
187 update_chunk = make_uniq<DataChunk>();
188 update_chunk->Initialize(allocator&: Allocator::DefaultAllocator(), types: update_types);
189
190 // fetch the updated values from the base segment
191 info.segment->FetchCommitted(vector_index: info.vector_index, result&: update_chunk->data[0]);
192
193 // write the row ids into the chunk
194 auto row_ids = FlatVector::GetData<row_t>(vector&: update_chunk->data[1]);
195 idx_t start = column_data.start + info.vector_index * STANDARD_VECTOR_SIZE;
196 for (idx_t i = 0; i < info.N; i++) {
197 row_ids[info.tuples[i]] = start + info.tuples[i];
198 }
199 if (column_data.type.id() == LogicalTypeId::VALIDITY) {
200 // zero-initialize the booleans
201 // FIXME: this is only required because of NullValue<T> in Vector::Serialize...
202 auto booleans = FlatVector::GetData<bool>(vector&: update_chunk->data[0]);
203 for (idx_t i = 0; i < info.N; i++) {
204 auto idx = info.tuples[i];
205 booleans[idx] = false;
206 }
207 }
208 SelectionVector sel(info.tuples);
209 update_chunk->Slice(sel_vector: sel, count: info.N);
210
211 // construct the column index path
212 vector<column_t> column_indexes;
213 reference<ColumnData> current_column_data = column_data;
214 while (current_column_data.get().parent) {
215 column_indexes.push_back(x: current_column_data.get().column_index);
216 current_column_data = *current_column_data.get().parent;
217 }
218 column_indexes.push_back(x: info.column_index);
219 std::reverse(first: column_indexes.begin(), last: column_indexes.end());
220
221 log->WriteUpdate(chunk&: *update_chunk, column_path: column_indexes);
222}
223
224template <bool HAS_LOG>
225void CommitState::CommitEntry(UndoFlags type, data_ptr_t data) {
226 switch (type) {
227 case UndoFlags::CATALOG_ENTRY: {
228 // set the commit timestamp of the catalog entry to the given id
229 auto catalog_entry = Load<CatalogEntry *>(ptr: data);
230 D_ASSERT(catalog_entry->parent);
231
232 auto &catalog = catalog_entry->ParentCatalog();
233 D_ASSERT(catalog.IsDuckCatalog());
234
235 // Grab a write lock on the catalog
236 auto &duck_catalog = catalog.Cast<DuckCatalog>();
237 lock_guard<mutex> write_lock(duck_catalog.GetWriteLock());
238 catalog_entry->set->UpdateTimestamp(entry&: *catalog_entry->parent, timestamp: commit_id);
239 if (catalog_entry->name != catalog_entry->parent->name) {
240 catalog_entry->set->UpdateTimestamp(entry&: *catalog_entry, timestamp: commit_id);
241 }
242 if (HAS_LOG) {
243 // push the catalog update to the WAL
244 WriteCatalogEntry(entry&: *catalog_entry, dataptr: data + sizeof(CatalogEntry *));
245 }
246 break;
247 }
248 case UndoFlags::INSERT_TUPLE: {
249 // append:
250 auto info = reinterpret_cast<AppendInfo *>(data);
251 if (HAS_LOG && !info->table->info->IsTemporary()) {
252 info->table->WriteToLog(log&: *log, row_start: info->start_row, count: info->count);
253 }
254 // mark the tuples as committed
255 info->table->CommitAppend(commit_id, row_start: info->start_row, count: info->count);
256 break;
257 }
258 case UndoFlags::DELETE_TUPLE: {
259 // deletion:
260 auto info = reinterpret_cast<DeleteInfo *>(data);
261 if (HAS_LOG && !info->table->info->IsTemporary()) {
262 WriteDelete(info&: *info);
263 }
264 // mark the tuples as committed
265 info->vinfo->CommitDelete(commit_id, rows: info->rows, count: info->count);
266 break;
267 }
268 case UndoFlags::UPDATE_TUPLE: {
269 // update:
270 auto info = reinterpret_cast<UpdateInfo *>(data);
271 if (HAS_LOG && !info->segment->column_data.GetTableInfo().IsTemporary()) {
272 WriteUpdate(info&: *info);
273 }
274 info->version_number = commit_id;
275 break;
276 }
277 default:
278 throw InternalException("UndoBuffer - don't know how to commit this type!");
279 }
280}
281
282void CommitState::RevertCommit(UndoFlags type, data_ptr_t data) {
283 transaction_t transaction_id = commit_id;
284 switch (type) {
285 case UndoFlags::CATALOG_ENTRY: {
286 // set the commit timestamp of the catalog entry to the given id
287 auto catalog_entry = Load<CatalogEntry *>(ptr: data);
288 D_ASSERT(catalog_entry->parent);
289 catalog_entry->set->UpdateTimestamp(entry&: *catalog_entry->parent, timestamp: transaction_id);
290 if (catalog_entry->name != catalog_entry->parent->name) {
291 catalog_entry->set->UpdateTimestamp(entry&: *catalog_entry, timestamp: transaction_id);
292 }
293 break;
294 }
295 case UndoFlags::INSERT_TUPLE: {
296 auto info = reinterpret_cast<AppendInfo *>(data);
297 // revert this append
298 info->table->RevertAppend(start_row: info->start_row, count: info->count);
299 break;
300 }
301 case UndoFlags::DELETE_TUPLE: {
302 // deletion:
303 auto info = reinterpret_cast<DeleteInfo *>(data);
304 info->table->info->cardinality += info->count;
305 // revert the commit by writing the (uncommitted) transaction_id back into the version info
306 info->vinfo->CommitDelete(commit_id: transaction_id, rows: info->rows, count: info->count);
307 break;
308 }
309 case UndoFlags::UPDATE_TUPLE: {
310 // update:
311 auto info = reinterpret_cast<UpdateInfo *>(data);
312 info->version_number = transaction_id;
313 break;
314 }
315 default:
316 throw InternalException("UndoBuffer - don't know how to revert commit of this type!");
317 }
318}
319
320template void CommitState::CommitEntry<true>(UndoFlags type, data_ptr_t data);
321template void CommitState::CommitEntry<false>(UndoFlags type, data_ptr_t data);
322
323} // namespace duckdb
324