1#include "duckdb/transaction/commit_state.hpp"
2#include "duckdb/transaction/delete_info.hpp"
3#include "duckdb/transaction/update_info.hpp"
4
5#include "duckdb/storage/data_table.hpp"
6#include "duckdb/storage/write_ahead_log.hpp"
7#include "duckdb/storage/uncompressed_segment.hpp"
8#include "duckdb/common/serializer/buffered_deserializer.hpp"
9#include "duckdb/parser/parsed_data/alter_table_info.hpp"
10
11using namespace duckdb;
12using namespace std;
13
14CommitState::CommitState(transaction_t commit_id, WriteAheadLog *log)
15 : log(log), commit_id(commit_id), current_table_info(nullptr) {
16}
17
18void CommitState::SwitchTable(DataTableInfo *table_info, UndoFlags new_op) {
19 if (current_table_info != table_info) {
20 // write the current table to the log
21 log->WriteSetTable(table_info->schema, table_info->table);
22 current_table_info = table_info;
23 }
24}
25
26void CommitState::WriteCatalogEntry(CatalogEntry *entry, data_ptr_t dataptr) {
27 assert(log);
28 // look at the type of the parent entry
29 auto parent = entry->parent;
30 switch (parent->type) {
31 case CatalogType::TABLE:
32 if (parent->temporary) {
33 return;
34 }
35 if (entry->type == CatalogType::TABLE) {
36 // ALTER TABLE statement, read the extra data after the entry
37 auto extra_data_size = *((idx_t *)dataptr);
38 auto extra_data = (data_ptr_t)(dataptr + sizeof(idx_t));
39 // deserialize it
40 BufferedDeserializer source(extra_data, extra_data_size);
41 auto info = AlterInfo::Deserialize(source);
42 // write the alter table in the log
43 log->WriteAlter(*info);
44 } else {
45 // CREATE TABLE statement
46 log->WriteCreateTable((TableCatalogEntry *)parent);
47 }
48 break;
49 case CatalogType::SCHEMA:
50 if (entry->type == CatalogType::SCHEMA) {
51 // ALTER TABLE statement, skip it
52 return;
53 }
54 log->WriteCreateSchema((SchemaCatalogEntry *)parent);
55 break;
56 case CatalogType::VIEW:
57 log->WriteCreateView((ViewCatalogEntry *)parent);
58 break;
59 case CatalogType::SEQUENCE:
60 log->WriteCreateSequence((SequenceCatalogEntry *)parent);
61 break;
62 case CatalogType::DELETED_ENTRY:
63 if (entry->type == CatalogType::TABLE) {
64 log->WriteDropTable((TableCatalogEntry *)entry);
65 } else if (entry->type == CatalogType::SCHEMA) {
66 log->WriteDropSchema((SchemaCatalogEntry *)entry);
67 } else if (entry->type == CatalogType::VIEW) {
68 log->WriteDropView((ViewCatalogEntry *)entry);
69 } else if (entry->type == CatalogType::SEQUENCE) {
70 log->WriteDropSequence((SequenceCatalogEntry *)entry);
71 } else if (entry->type == CatalogType::PREPARED_STATEMENT) {
72 // do nothing, we log the query to drop this
73 } else {
74 throw NotImplementedException("Don't know how to drop this type!");
75 }
76 break;
77
78 case CatalogType::INDEX:
79 case CatalogType::PREPARED_STATEMENT:
80 case CatalogType::AGGREGATE_FUNCTION:
81 case CatalogType::SCALAR_FUNCTION:
82 case CatalogType::TABLE_FUNCTION:
83
84 // do nothing, we log the query to recreate this
85 break;
86 default:
87 throw NotImplementedException("UndoBuffer - don't know how to write this entry to the WAL");
88 }
89}
90
91void CommitState::WriteDelete(DeleteInfo *info) {
92 assert(log);
93 // switch to the current table, if necessary
94 SwitchTable(info->table->info.get(), UndoFlags::DELETE_TUPLE);
95
96 if (!delete_chunk) {
97 delete_chunk = make_unique<DataChunk>();
98 vector<TypeId> delete_types = {ROW_TYPE};
99 delete_chunk->Initialize(delete_types);
100 }
101 auto rows = FlatVector::GetData<row_t>(delete_chunk->data[0]);
102 for (idx_t i = 0; i < info->count; i++) {
103 rows[i] = info->base_row + info->rows[i];
104 }
105 delete_chunk->SetCardinality(info->count);
106 log->WriteDelete(*delete_chunk);
107}
108
109void CommitState::WriteUpdate(UpdateInfo *info) {
110 assert(log);
111 // switch to the current table, if necessary
112 SwitchTable(&info->column_data->table_info, UndoFlags::UPDATE_TUPLE);
113
114 update_chunk = make_unique<DataChunk>();
115 vector<TypeId> update_types = {info->column_data->type, ROW_TYPE};
116 update_chunk->Initialize(update_types);
117
118 // fetch the updated values from the base table
119 ColumnScanState state;
120 info->segment->InitializeScan(state);
121 info->segment->Fetch(state, info->vector_index, update_chunk->data[0]);
122
123 // write the row ids into the chunk
124 auto row_ids = FlatVector::GetData<row_t>(update_chunk->data[1]);
125 idx_t start = info->segment->row_start + info->vector_index * STANDARD_VECTOR_SIZE;
126 for (idx_t i = 0; i < info->N; i++) {
127 row_ids[info->tuples[i]] = start + info->tuples[i];
128 }
129 SelectionVector sel(info->tuples);
130 update_chunk->Slice(sel, info->N);
131
132 log->WriteUpdate(*update_chunk, info->column_data->column_idx);
133}
134
135template <bool HAS_LOG> void CommitState::CommitEntry(UndoFlags type, data_ptr_t data) {
136 switch (type) {
137 case UndoFlags::CATALOG_ENTRY: {
138 // set the commit timestamp of the catalog entry to the given id
139 CatalogEntry *catalog_entry = *((CatalogEntry **)data);
140 assert(catalog_entry->parent);
141 catalog_entry->parent->timestamp = commit_id;
142
143 if (HAS_LOG) {
144 // push the catalog update to the WAL
145 WriteCatalogEntry(catalog_entry, data + sizeof(CatalogEntry *));
146 }
147 break;
148 }
149 case UndoFlags::DELETE_TUPLE: {
150 // deletion:
151 auto info = (DeleteInfo *)data;
152 info->table->info->cardinality -= info->count;
153 if (HAS_LOG && !info->table->info->IsTemporary()) {
154 WriteDelete(info);
155 }
156 // mark the tuples as committed
157 info->vinfo->CommitDelete(commit_id, info->rows, info->count);
158 break;
159 }
160 case UndoFlags::UPDATE_TUPLE: {
161 // update:
162 auto info = (UpdateInfo *)data;
163 if (HAS_LOG && !info->column_data->table_info.IsTemporary()) {
164 WriteUpdate(info);
165 }
166 info->version_number = commit_id;
167 break;
168 }
169 default:
170 throw NotImplementedException("UndoBuffer - don't know how to commit this type!");
171 }
172}
173
174void CommitState::RevertCommit(UndoFlags type, data_ptr_t data) {
175 transaction_t transaction_id = commit_id;
176 switch (type) {
177 case UndoFlags::CATALOG_ENTRY: {
178 // set the commit timestamp of the catalog entry to the given id
179 CatalogEntry *catalog_entry = *((CatalogEntry **)data);
180 assert(catalog_entry->parent);
181 catalog_entry->parent->timestamp = transaction_id;
182 break;
183 }
184 case UndoFlags::DELETE_TUPLE: {
185 // deletion:
186 auto info = (DeleteInfo *)data;
187 info->table->info->cardinality += info->count;
188 // revert the commit by writing the (uncommitted) transaction_id back into the version info
189 info->vinfo->CommitDelete(transaction_id, info->rows, info->count);
190 break;
191 }
192 case UndoFlags::UPDATE_TUPLE: {
193 // update:
194 auto info = (UpdateInfo *)data;
195 info->version_number = transaction_id;
196 break;
197 }
198 default:
199 throw NotImplementedException("UndoBuffer - don't know how to revert commit of this type!");
200 }
201}
202
203template void CommitState::CommitEntry<true>(UndoFlags type, data_ptr_t data);
204template void CommitState::CommitEntry<false>(UndoFlags type, data_ptr_t data);
205